1) minor fixes to feed management console 2) added provision for specifying (min,max) tps to the twitter firehose adaptor
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index a032db4..a89b6d5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -63,7 +63,6 @@
             }
         } else {
             obj = verifyIfFeedIsAlive(dataverseName, feedName, datasetName);
-            // do null check
         }
 
         PrintWriter out = response.getWriter();
@@ -75,7 +74,7 @@
         LinkedBlockingQueue<String> queue = FeedLifecycleListener.INSTANCE.getFeedReportQueue(feedId);
         String report = null;
         try {
-            report = queue.poll(30, TimeUnit.SECONDS);
+            report = queue.poll(5, TimeUnit.SECONDS);
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -83,35 +82,33 @@
     }
 
     private JSONObject verifyIfFeedIsAlive(String dataverseName, String feedName, String datasetName) {
-        JSONObject obj = null;
+        JSONObject obj = new JSONObject();
         try {
             MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
             List<FeedActivity> feedActivities = MetadataManager.INSTANCE
                     .getActiveFeeds(ctx, dataverseName, datasetName);
-            boolean active = false;
-            FeedActivity activity = null;
-            for (FeedActivity feedActivity : feedActivities) {
-                active = (feedActivity.getFeedName().equals(feedName) && feedActivity.getActivityType().equals(
-                        FeedActivityType.FEED_BEGIN));
-                if (active) {
-                    activity = feedActivity;
+            FeedConnectionId feedId = new FeedConnectionId(dataverseName, feedName, datasetName);
+            FeedActivity activity = MetadataManager.INSTANCE.getRecentActivityOnFeedConnection(ctx, feedId, null);
+            switch(activity.getActivityType()){
+                case FEED_BEGIN:
+                    Map<String, String> activityDetails = activity.getFeedActivityDetails();
+                    String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
+                    String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
+                    String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
+                    obj.put("status", "active");
+                    obj.put("type", "reload");
+                    obj.put("ingestLocations", ingestLocations);
+                    obj.put("computeLocations", computeLocations);
+                    obj.put("storageLocations", storageLocations);
+                    System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
+                            + computeLocations + " storage at " + storageLocations);
                     break;
-                }
-            }
-            if (active) {
-                Map<String, String> activityDetails = activity.getFeedActivityDetails();
-                String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
-                String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
-                String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
-                obj = new JSONObject();
-                obj.put("type", "reload");
-                obj.put("ingestLocations", ingestLocations);
-                obj.put("computeLocations", computeLocations);
-                obj.put("storageLocations", storageLocations);
-                System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
-                        + computeLocations + " storage at " + storageLocations);
-            } else {
-                obj = null;
+                case FEED_FAILURE:
+                    obj.put("status", "failed");
+                    break;
+                case FEED_END:
+                    obj.put("status", "ended");
+                    break;
             }
         } catch (Exception e) {
             // ignore
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index 97a8576..a5f1813 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -5,7 +5,7 @@
     <script type="text/javascript">
     $(document).ready(function() {
         var feedSeries = new TimeSeries();
-      
+      	var state = "ACTIVE";
         var dataverse = "%s";
         var dataset = "%s";
         var feed = "%s";
@@ -24,7 +24,7 @@
         var graphNames = new Array();
 
         $.ajaxSetup({ cache: false });
-        setInterval(fetchFeedReport, 500);
+        setInterval(fetchFeedReport, 5000);
         function fetchFeedReport() {
           $.ajax({
             url: '/feed/data?dataverse=' + dataverse + '&dataset=' + dataset + '&feed=' + feed,
@@ -36,26 +36,36 @@
 
          
         function onFeedReportReceived(data) {
-            var type = data["type"];
-            if (type == ("reload")) {
-              ingestLocations  = data["ingestLocations"];
-              computeLocations = data["computeLocations"];
-              storageLocations = data["storageLocations"];
+            var status = data["status"];
+            if(status == ("ended")){
+              ingestLocations = " ";
+              computeLocations = " ";
+              storageLocations = " ";
+              ingestionPolicy = " ";
+              state = "INACTIVE";
               document.location.reload(true);
             } else {
-            var report = data["value"];
-            var tputArray = report.split("|");
-            var covered = 0;
-            var totalTput = 0;
-            for( var i = 0; i < tputArray.length-1; i ++){
-               ingestionTimeSeries[i].append(data["time"], tputArray[i]);
-               covered++;
-               totalTput += parseInt(tputArray[i]);
-            }          
-            for( var j = covered; j < numIngestionNodes; j++){
-                ingestionTimeSeries[j].append(data["time"], 0);
-            }          
-            ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
+            	var type = data["type"];
+            	if (type == ("reload")) {
+              		ingestLocations  = data["ingestLocations"];
+              		computeLocations = data["computeLocations"];
+              		storageLocations = data["storageLocations"];
+              		document.location.reload(true);
+           		 } else {
+            		var report = data["value"];
+            		var tputArray = report.split("|");
+            		var covered = 0;
+            		var totalTput = 0;
+            		for( var i = 0; i < tputArray.length-1; i ++){
+               			ingestionTimeSeries[i].append(data["time"], tputArray[i]);
+               			covered++;
+               			totalTput += parseInt(tputArray[i]);
+            		}          
+            		for( var j = covered; j < numIngestionNodes; j++){
+                		ingestionTimeSeries[j].append(data["time"], 0);
+            		}          
+            		ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
+            	}
             }
         }
 
@@ -72,7 +82,7 @@
           document.write("<br />" + "Compute Locations: " + computeLocations);
           document.write("<br />" + "Storage Locations: " + storageLocations);
           document.write("<br />" + "Ingestion Policy: " + ingestionPolicy);
-          document.write("<br />" + "Active since" + activeSince);
+          document.write("<br />" + "Status: " + state);
           document.write("<br />");
           document.write("<br />");
     
@@ -103,7 +113,7 @@
         }   
 
         function drawChart(chartName, ingestionTimeSeries) {
-             var ingestionChart = new SmoothieChart({ minValue:0, millisPerPixel: 20, grid: { strokeStyle: '#555555', lineWidth: 1, millisPerLine: 1000, verticalSections: 10 }});
+             var ingestionChart = new SmoothieChart({ timestampFormatter:SmoothieChart.timeFormatter, interpolation:'linear', minValue:0, millisPerPixel: 20, grid: { strokeStyle: '#555555', lineWidth: 1, millisPerLine: 1000, verticalSections: 10 }});
              ingestionChart.addTimeSeries(ingestionTimeSeries, seriesOptions);
              ingestionChart.streamTo(document.getElementById(chartName, 500));
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index c06f884..a6c7c18 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -17,30 +17,70 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
+/**
+ * A wrapper around the standard frame writer provided to an operator node pushable.
+ * The wrapper monitors the flow of data from this operator to a downstream operator
+ * over a connector. It collects statistics if required by the feed ingestion policy
+ * and reports them to the Super Feed Manager chosen for the feed. In addition any
+ * congestion experienced by the operator is also reported.
+ */
 public class FeedFrameWriter implements IFrameWriter {
 
     private static final Logger LOGGER = Logger.getLogger(FeedFrameWriter.class.getName());
 
+    /** The threshold for the time required in pushing a frame to the network. **/
+    public static final long FLUSH_THRESHOLD_TIME = 5000; // 3 seconds
+
+    /** Actual frame writer provided to an operator. **/
     private IFrameWriter writer;
 
+    /** The node pushable associated with the operator **/
     private IOperatorNodePushable nodePushable;
 
-    private final boolean collectStatistics;
+    /** set to true if health need to be monitored **/
+    private final boolean reportHealth;
 
+    /** A buffer for keeping frames that are waiting to be processed **/
     private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
 
+    /**
+     * Mode associated with the frame writer
+     * Possible values: FORWARD, STORE
+     * 
+     * @see Mode
+     */
     private Mode mode;
 
-    public static final long FLUSH_THRESHOLD_TIME = 5000;
+    /**
+     * Detects if the operator is unable to push a frame downstream
+     * within a threshold period of time. In addition, measure the
+     * throughput as observed on the output channel of the associated operator.
+     */
+    private HealthMonitor healthMonitor;
 
-    private FramePushWait framePushWait;
-
+    /**
+     * Manager scheduling of tasks
+     */
     private Timer timer;
 
+    /**
+     * Provides access to the tuples in a frame. Used in collecting statistics.
+     */
     private FrameTupleAccessor fta;
 
     public enum Mode {
+        /**
+         * **
+         * Normal mode of operation for an operator when
+         * frames are pushed to the downstream operator.
+         */
         FORWARD,
+
+        /**
+         * Failure mode of operation for an operator when
+         * input frames are not pushed to the downstream operator but
+         * are buffered for future retrieval.
+         */
         STORE
     }
 
@@ -50,13 +90,21 @@
         this.writer = writer;
         this.mode = Mode.FORWARD;
         this.nodePushable = nodePushable;
-        this.collectStatistics = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
-        if (collectStatistics) {
+        this.reportHealth = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
+        if (reportHealth) {
             timer = new Timer();
-            framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, feedId, nodeId, feedRuntimeType,
-                    partition, FLUSH_THRESHOLD_TIME, timer);
-
-            timer.scheduleAtFixedRate(framePushWait, 0, FLUSH_THRESHOLD_TIME);
+            healthMonitor = new HealthMonitor(FLUSH_THRESHOLD_TIME, feedId, nodeId, feedRuntimeType, partition,
+                    FLUSH_THRESHOLD_TIME, timer);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Statistics collection enabled for the feed " + feedId + " " + feedRuntimeType + " ["
+                        + partition + "]");
+            }
+            timer.scheduleAtFixedRate(healthMonitor, 0, FLUSH_THRESHOLD_TIME);
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Statistics collection *not* enabled for the feed " + feedId + " " + feedRuntimeType + " ["
+                        + partition + "]");
+            }
         }
         this.fta = fta;
     }
@@ -75,33 +123,16 @@
         this.mode = newMode;
     }
 
-    public List<ByteBuffer> getStoredFrames() {
-        return frames;
-    }
-
-    public void clear() {
-        frames.clear();
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        writer.open();
-    }
-
-    public void reset() {
-        framePushWait.reset();
-    }
-
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         switch (mode) {
             case FORWARD:
                 try {
-                    if (collectStatistics) {
+                    if (reportHealth) {
                         fta.reset(buffer);
-                        framePushWait.notifyStart();
+                        healthMonitor.notifyStart();
                         writer.nextFrame(buffer);
-                        framePushWait.notifyFinish(fta.getTupleCount());
+                        healthMonitor.notifyFinish(fta.getTupleCount());
                     } else {
                         writer.nextFrame(buffer);
                     }
@@ -126,15 +157,17 @@
                 storageBuffer.put(buffer);
                 frames.add(storageBuffer);
                 storageBuffer.flip();
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Stored frame for " + nodePushable);
+                }
                 break;
         }
     }
 
-    private static class FramePushWait extends TimerTask {
+    private static class HealthMonitor extends TimerTask {
 
         private long startTime = -1;
-        private IOperatorNodePushable nodePushable;
-        private State state;
+        private FramePushState state;
         private long flushThresholdTime;
         private static final String EOL = "\n";
         private FeedConnectionId feedId;
@@ -146,11 +179,10 @@
         private boolean collectThroughput;
         private FeedMessageService mesgService;
 
-        public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, FeedConnectionId feedId,
-                String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period, Timer timer) {
-            this.nodePushable = nodePushable;
+        public HealthMonitor(long flushThresholdTime, FeedConnectionId feedId, String nodeId,
+                FeedRuntimeType feedRuntimeType, int partition, long period, Timer timer) {
             this.flushThresholdTime = flushThresholdTime;
-            this.state = State.INTIALIZED;
+            this.state = FramePushState.INTIALIZED;
             this.feedId = feedId;
             this.nodeId = nodeId;
             this.feedRuntimeType = feedRuntimeType;
@@ -161,57 +193,49 @@
 
         public void notifyStart() {
             startTime = System.currentTimeMillis();
-            state = State.WAITING_FOR_FLUSH_COMPLETION;
-
+            state = FramePushState.WAITING_FOR_FLUSH_COMPLETION;
         }
 
+        /**
+         * Reset method is invoked when a live instance of operator needs to take
+         * over from the zombie instance from the previously failed execution
+         */
         public void reset() {
             mesgService = null;
             collectThroughput = true;
         }
 
         public void notifyFinish(int numTuples) {
-            state = State.WAITNG_FOR_NEXT_FRAME;
+            state = FramePushState.WAITNG_FOR_NEXT_FRAME;
             numTuplesInInterval.set(numTuplesInInterval.get() + numTuples);
         }
 
         @Override
         public void run() {
-            if (state.equals(State.WAITING_FOR_FLUSH_COMPLETION)) {
+            if (state.equals(FramePushState.WAITING_FOR_FLUSH_COMPLETION)) {
                 long currentTime = System.currentTimeMillis();
                 if (currentTime - startTime > flushThresholdTime) {
                     if (LOGGER.isLoggable(Level.SEVERE)) {
                         LOGGER.severe("Congestion reported by " + feedRuntimeType + " [" + partition + "]");
                     }
-                    sendReportToSFM(currentTime - startTime, FeedReportMessageType.CONGESTION);
+                    sendReportToSFM(currentTime - startTime, FeedReportMessageType.CONGESTION,
+                            System.currentTimeMillis());
                 }
             }
             if (collectThroughput) {
-                System.out.println(" NUMBER of TUPLES " + numTuplesInInterval.get() + " in  " + period
-                        + " for partition " + partition);
                 int instantTput = (int) Math.ceil((((double) numTuplesInInterval.get() * 1000) / period));
-                sendReportToSFM(instantTput, FeedReportMessageType.THROUGHPUT);
+                sendReportToSFM(instantTput, FeedReportMessageType.THROUGHPUT, System.currentTimeMillis());
             }
             numTuplesInInterval.set(0);
         }
 
-        private void sendReportToSFM(long value, SuperFeedManager.FeedReportMessageType mesgType) {
-            String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
-            String operator = "" + feedRuntimeType;
-            String message = mesgType.name().toLowerCase() + "|" + feedRep + "|" + operator + "|" + partition + "|"
-                    + value + "|" + nodeId + "|" + EOL;
+        private void sendReportToSFM(long value, SuperFeedManager.FeedReportMessageType mesgType, long timestamp) {
             if (mesgService == null) {
-                while (mesgService == null) {
-                    mesgService = FeedManager.INSTANCE.getFeedMessageService(feedId);
-                    if (mesgService == null) {
-                        try {
-                            Thread.sleep(2000);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
+                waitTillMessageServiceIsUp();
             }
+            String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
+            String message = mesgType.name().toLowerCase() + "|" + feedRep + "|" + feedRuntimeType + "|" + partition
+                    + "|" + value + "|" + nodeId + "|" + timestamp + "|" + EOL;
             try {
                 mesgService.sendMessage(message);
             } catch (IOException ioe) {
@@ -223,17 +247,43 @@
             }
         }
 
+        private void waitTillMessageServiceIsUp() {
+            while (mesgService == null) {
+                mesgService = FeedManager.INSTANCE.getFeedMessageService(feedId);
+                if (mesgService == null) {
+                    try {
+                        /**
+                         * wait for the message service to be available
+                         */
+                        Thread.sleep(2000);
+                    } catch (InterruptedException e) {
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.warning("Encountered an interrupted exception " + " Exception " + e);
+                        }
+                    }
+                }
+            }
+        }
+
         public void deactivate() {
+            this.cancel();
             collectThroughput = false;
         }
 
-        public void activate() {
-            collectThroughput = true;
-        }
-
-        private enum State {
+        private enum FramePushState {
+            /**
+             * Frame writer has been initialized
+             */
             INTIALIZED,
+
+            /**
+             * Frame writer is waiting for a pending flush to finish.
+             */
             WAITING_FOR_FLUSH_COMPLETION,
+
+            /**
+             * Frame writer is waiting to be given the next frame.
+             */
             WAITNG_FOR_NEXT_FRAME
         }
 
@@ -242,21 +292,19 @@
     @Override
     public void fail() throws HyracksDataException {
         writer.fail();
-        if (framePushWait != null && !framePushWait.feedRuntimeType.equals(FeedRuntimeType.INGESTION)) {
-            framePushWait.cancel();
-            framePushWait.deactivate();
+        if (healthMonitor != null && !healthMonitor.feedRuntimeType.equals(FeedRuntimeType.INGESTION)) {
+            healthMonitor.deactivate();
         }
-        framePushWait.reset();
+        healthMonitor.reset();
     }
 
     @Override
     public void close() throws HyracksDataException {
-        if (framePushWait != null) {
+        if (healthMonitor != null) {
+            healthMonitor.deactivate();
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Closing frame statistics collection activity" + framePushWait);
+                LOGGER.info("Closing frame statistics collection activity" + healthMonitor);
             }
-            framePushWait.deactivate();
-            framePushWait.cancel();
         }
         writer.close();
     }
@@ -274,4 +322,21 @@
         return "MaterializingFrameWriter using " + writer;
     }
 
+    public List<ByteBuffer> getStoredFrames() {
+        return frames;
+    }
+
+    public void clear() {
+        frames.clear();
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        writer.open();
+    }
+
+    public void reset() {
+        healthMonitor.reset();
+    }
+
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
index efa6cbb..5c8c422 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorDescriptor.java
@@ -31,17 +31,25 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 
 /**
- * Operator responsible for ingesting data from an external source. This
- * operator uses a (configurable) adapter associated with the feed dataset.
+ * FeedIntakeOperatorDescriptor is responsible for ingesting data from an external source. This
+ * operator uses a user specified for a built-in adaptor for retrieving data from the external 
+ * data source. 
  */
 public class FeedIntakeOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(FeedIntakeOperatorDescriptor.class.getName());
 
+    /** The type associated with the ADM data output from the feed adaptor*/
     private final IAType atype;
+    
+    /** unique identifier for a feed instance.*/
     private final FeedConnectionId feedId;
+    
+    /** Map representation of policy parameters */
     private final Map<String, String> feedPolicy;
+    
+    /** The adaptor factory that is used to create an instance of the feed adaptor**/
     private IAdapterFactory adapterFactory;
 
     public FeedIntakeOperatorDescriptor(JobSpecification spec, FeedConnectionId feedId, IAdapterFactory adapterFactory,
@@ -73,6 +81,9 @@
                 }
             }
         } catch (Exception e) {
+            if(LOGGER.isLoggable(Level.SEVERE)){
+                LOGGER.severe("Initialization of the feed adaptor failed");
+            }
             throw new HyracksDataException("initialization of adapter failed", e);
         }
         return new FeedIntakeOperatorNodePushable(ctx, feedId, adapter, feedPolicy, partition, ingestionRuntime);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 5ed4ccc..58615fc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -22,7 +22,6 @@
 
 import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
index 2e31f6d..40b2f26 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
@@ -9,6 +9,10 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+/**
+ * Sends feed report messages on behalf of an operator instance
+ * to the SuperFeedMaanger associated with the feed.
+ */
 public class FeedMessageService {
 
     private static final Logger LOGGER = Logger.getLogger(FeedMessageService.class.getName());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 78b03ee..cc597cc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -1,7 +1,6 @@
 package edu.uci.ics.asterix.metadata.feeds;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -14,7 +13,6 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -22,15 +20,42 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 
+/**
+ * FeedMetaOperatorDescriptor is a wrapper operator that provides a sanboox like
+ * environment for an hyracks operator that is part of a feed ingestion pipeline.
+ * The MetaFeed operator provides an interface iden- tical to that offered by the
+ * underlying wrapped operator, hereafter referred to as the core operator.
+ * As seen by Hyracks, the altered pipeline is identical to the earlier version formed
+ * from core operators. The MetaFeed operator enhances each core operator by providing
+ * functionality for handling runtime exceptions, saving any state for future retrieval,
+ * and measuring/reporting of performance characteristics. We next describe how the added
+ * functionality contributes to providing fault- tolerance.
+ */
+
 public class FeedMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
 
     private static final Logger LOGGER = Logger.getLogger(FeedMetaOperatorDescriptor.class.getName());
 
+    /** The actual (Hyracks) operator that is wrapped around by the Metafeed Adaptor **/
     private IOperatorDescriptor coreOperator;
+
+    /**
+     * A unique identifier for the feed instance. A feed instance represents the flow of data
+     * from a feed to a dataset.
+     **/
     private final FeedConnectionId feedConnectionId;
+
+    /**
+     * The policy associated with the feed instance.
+     */
     private final FeedPolicy feedPolicy;
+
+    /**
+     * type for the feed runtime associated with the operator.
+     * Possible values: INGESTION, COMPUTE, STORAGE, COMMIT
+     */
     private final FeedRuntimeType runtimeType;
 
     public FeedMetaOperatorDescriptor(JobSpecification spec, FeedConnectionId feedConnectionId,
@@ -59,16 +84,44 @@
 
     private static class FeedMetaNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
 
+        /** Runtime node pushable corresponding to the core feed operator **/
         private AbstractUnaryInputUnaryOutputOperatorNodePushable coreOperatorNodePushable;
+
+        /**
+         * A policy enforcer that ensures dyanmic decisions for a feed are taken in accordance
+         * with the associated ingestion policy
+         **/
         private FeedPolicyEnforcer policyEnforcer;
+
+        /**
+         * The Feed Runtime instance associated with the operator. Feed Runtime captures the state of the operator while
+         * the feed is active.
+         */
         private FeedRuntime feedRuntime;
+
+        /**
+         * A unique identifier for the feed instance. A feed instance represents the flow of data
+         * from a feed to a dataset.
+         **/
         private FeedConnectionId feedId;
+
+        /** Denotes the i'th operator instance in a setting where K operator instances are scheduled to run in parallel **/
         private int partition;
+
+        /** A buffer that is used to hold the current frame that is being processed **/
         private ByteBuffer currentBuffer;
+
+        /** Type associated with the core feed operator **/
         private final FeedRuntimeType runtimeType;
+
+        /** True is the feed is recovering from a previous failed execution **/
         private boolean resumeOldState;
-        private ExecutorService feedExecService;
+
+        /** The Node Controller ID for the host NC **/
+
         private String nodeId;
+
+        /** Allows to iterate over the tuples in a frame **/
         private FrameTupleAccessor fta;
 
         public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
@@ -93,21 +146,25 @@
                     feedRuntime = new FeedRuntime(feedId, partition, runtimeType);
                     FeedManager.INSTANCE.registerFeedRuntime(feedRuntime);
                     if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Did not find a saved state, starting fresh for " + runtimeType + " node.");
+                        LOGGER.warning("Did not find a saved state from a previous zombie, starting a new instance for "
+                                + runtimeType + " node.");
                     }
                     resumeOldState = false;
                 } else {
                     if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Resuming from saved state (if any) of " + runtimeType + " node.");
+                        LOGGER.warning("Retreived state from the zombie instance from previous execution for "
+                                + runtimeType + " node.");
                     }
                     resumeOldState = true;
                 }
-                feedExecService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
                 FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
                         runtimeType, partition, fta);
                 coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
                 coreOperatorNodePushable.open();
             } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.severe("Unable to initialize feed operator " + feedRuntime + " [" + partition + "]");
+                }
                 throw new HyracksDataException(e);
             }
         }
@@ -117,7 +174,8 @@
             try {
                 if (resumeOldState) {
                     if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Old state " + feedRuntime.getRuntimeState().getFrame());
+                        LOGGER.warning("State from previous zombie instance "
+                                + feedRuntime.getRuntimeState().getFrame());
                     }
                     coreOperatorNodePushable.nextFrame(feedRuntime.getRuntimeState().getFrame());
                     feedRuntime.setRuntimeState(null);
@@ -131,8 +189,18 @@
                         // log the tuple
                         FeedRuntimeState runtimeState = new FeedRuntimeState(buffer, writer, e);
                         feedRuntime.setRuntimeState(runtimeState);
+                        String message = e.getMessage();
+                        String tIndexString = message.substring(message.lastIndexOf(':'));
+                        int tupleIndex = 0;
+                        if (tIndexString != null) {
+                            tupleIndex = Integer.parseInt(tIndexString);
+                        }
+                        fta.reset(buffer);
+                        int endOffset = fta.getTupleEndOffset(tupleIndex);
+                        buffer.flip();
+                        buffer.position(endOffset + 1);
                         if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Harmful exception (parked data) " + e);
+                            LOGGER.warning("Harmful exception (parked data) tupleIndex " + tupleIndex + e);
                         }
                     } else {
                         // ignore the frame (exception is expected)
@@ -140,8 +208,10 @@
                             LOGGER.warning("Ignoring exception " + e);
                         }
                     }
-
                 } else {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.severe("Feed policy does not require feed to survive soft failure");
+                    }
                     throw e;
                 }
             }
@@ -156,7 +226,7 @@
                     return false;
                 } else {
                     if (LOGGER.isLoggable(Level.SEVERE)) {
-                        LOGGER.warning("Received duplicate key exception!");
+                        LOGGER.severe("Received duplicate key exception!");
                     }
                     return true;
                 }
@@ -199,12 +269,4 @@
         return coreOperator;
     }
 
-    public FeedConnectionId getFeedConnectionId() {
-        return feedConnectionId;
-    }
-
-    public FeedPolicy getFeedPolicy() {
-        return feedPolicy;
-    }
-
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
index d0a83d8..fbff931 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedReport.java
@@ -13,10 +13,22 @@
     private int partition = -1;
     private FeedRuntimeType runtimeType;
     private long value = -1;
-    private final String[] representation;
+    private String[] representation;
 
-    public FeedReport(String rep) {
-        representation = rep.split("\\|");
+    public FeedReport() {
+    }
+
+    public FeedReport(String message) {
+        representation = message.split("\\|");
+    }
+
+    public void reset(String message) {
+        representation = message.split("\\|");
+        reportType = null;
+        feedId = null;
+        runtimeType = null;
+        partition = -1;
+        value = -1;
     }
 
     @Override
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index 18177c2..a45456b6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -33,24 +33,51 @@
 import edu.uci.ics.asterix.metadata.feeds.MessageListener.IMessageAnalyzer;
 import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
 
+/**
+ * The feed operators running in an NC report their health (statistics) to the local Feed Manager.
+ * A feed thus has a Feed Manager per NC. From amongst the Feed Maanger, a SuperFeedManager is chosen (randomly)
+ * The SuperFeedManager collects reports from the FeedMaangers and has the global cluster view in terms of
+ * how different feed operators running in a distributed fashion are performing.
+ */
 public class SuperFeedManager {
 
     private static final Logger LOGGER = Logger.getLogger(SuperFeedManager.class.getName());
+
+    /**
+     * IP Address or DNS name of the host where Super Feed Manager is running.
+     */
     private String host;
 
     private AtomicInteger availablePort; // starting value is fixed
 
+    /**
+     * The port at which the SuperFeedManager listens for connections by other Feed Managers.
+     */
     private final int feedReportPort; // fixed
 
+    /**
+     * The port at which the SuperFeedManager listens for connections by clients that wish
+     * to subscribe to the feed health reports.E.g. feed management console.
+     */
     private final int feedReportSubscribePort; // fixed
 
+    /**
+     * The Id of Node Controller
+     */
     private final String nodeId;
 
+    /**
+     * A unique identifier for the feed instance. A feed instance represents the flow of data
+     * from a feed to a dataset.
+     **/
     private final FeedConnectionId feedConnectionId;
 
+    /**
+     * Set to true of the Super Feed Manager is local to the NC.
+     **/
     private boolean isLocal = false;
 
-    private SuperFeedReportService sfmService;
+    private FeedReportDestinationSocketProvider sfmService;
 
     private SuperFeedReportSubscriptionService subscriptionService;
 
@@ -102,7 +129,8 @@
     public void start() throws IOException {
         if (sfmService == null) {
             ExecutorService executorService = FeedManager.INSTANCE.getFeedExecutorService(feedConnectionId);
-            sfmService = new SuperFeedReportService(feedReportPort, feedReportInbox, feedConnectionId, availablePort);
+            sfmService = new FeedReportDestinationSocketProvider(feedReportPort, feedReportInbox, feedConnectionId,
+                    availablePort);
             executorService.execute(sfmService);
             subscriptionService = new SuperFeedReportSubscriptionService(feedConnectionId, feedReportSubscribePort,
                     sfmService.getMesgAnalyzer(), availablePort);
@@ -144,10 +172,10 @@
         private AtomicInteger subscriptionPort;
         private boolean active = true;
         private String EOM = "\n";
-        private final SFMessageAnalyzer reportProvider;
+        private final FeedReportProvider reportProvider;
         private final List<FeedDataProviderService> dataProviders = new ArrayList<FeedDataProviderService>();
 
-        public SuperFeedReportSubscriptionService(FeedConnectionId feedId, int port, SFMessageAnalyzer reportProvider,
+        public SuperFeedReportSubscriptionService(FeedConnectionId feedId, int port, FeedReportProvider reportProvider,
                 AtomicInteger nextPort) throws IOException {
             this.feedId = feedId;
             serverFeedSubscribe = FeedManager.INSTANCE.getFeedRuntimeManager(feedId).createServerSocket(port);
@@ -238,36 +266,35 @@
 
     }
 
-    private static class SuperFeedReportService implements Runnable {
+    private static class FeedReportDestinationSocketProvider implements Runnable {
+
+        private static final String EOM = "\n";
 
         private AtomicInteger nextPort;
-        private final ServerSocket serverFeedReport;
-
-        private String EOM = "\n";
-
+        private final ServerSocket feedReportSocket;
         private final LinkedBlockingQueue<String> inbox;
         private final List<MessageListener> messageListeners;
-        private final SFMessageAnalyzer mesgAnalyzer;
+        private final FeedReportProvider mesgAnalyzer;
         private final FeedConnectionId feedId;
         private boolean process = true;
 
-        public SuperFeedReportService(int port, LinkedBlockingQueue<String> inbox, FeedConnectionId feedId,
-                AtomicInteger availablePort) throws IOException {
+        public FeedReportDestinationSocketProvider(int port, LinkedBlockingQueue<String> inbox,
+                FeedConnectionId feedId, AtomicInteger availablePort) throws IOException {
             FeedRuntimeManager runtimeManager = FeedManager.INSTANCE.getFeedRuntimeManager(feedId);
-            serverFeedReport = runtimeManager.createServerSocket(port);
-            nextPort = availablePort;
+            this.feedReportSocket = runtimeManager.createServerSocket(port);
+            this.nextPort = availablePort;
             this.inbox = inbox;
             this.feedId = feedId;
             this.messageListeners = new ArrayList<MessageListener>();
-            mesgAnalyzer = new SFMessageAnalyzer(inbox, feedId);
+            this.mesgAnalyzer = new FeedReportProvider(inbox, feedId);
             FeedManager.INSTANCE.getFeedExecutorService(feedId).execute(mesgAnalyzer);
         }
 
         public void stop() {
             process = false;
-            if (serverFeedReport != null) {
+            if (feedReportSocket != null) {
                 try {
-                    serverFeedReport.close();
+                    feedReportSocket.close();
                     process = false;
                 } catch (IOException e) {
                     e.printStackTrace();
@@ -284,12 +311,18 @@
             Socket client = null;
             while (process) {
                 try {
-                    client = serverFeedReport.accept();
-                    OutputStream os = client.getOutputStream();
+                    client = feedReportSocket.accept();
                     int port = nextPort.incrementAndGet();
+                    /**
+                     * MessageListener provides the functionality of listening at a port for messages
+                     * and delivering each received message to an input queue (inbox).
+                     */
                     MessageListener listener = new MessageListener(port, inbox);
                     listener.start();
-                    messageListeners.add(listener);
+                    synchronized (messageListeners) {
+                        messageListeners.add(listener);
+                    }
+                    OutputStream os = client.getOutputStream();
                     os.write((port + EOM).getBytes());
                     os.flush();
                 } catch (IOException e) {
@@ -308,13 +341,18 @@
             }
         }
 
-        public SFMessageAnalyzer getMesgAnalyzer() {
+        public FeedReportProvider getMesgAnalyzer() {
             return mesgAnalyzer;
         }
 
     }
 
-    private static class SFMessageAnalyzer implements Runnable {
+    /**
+     * The report messages sent by the feed operators are sent to the FeedReportProvider.
+     * FeedReportMessageAnalyzer is responsible for distributing the messages to the subscribers.
+     * The Feed Management Console is an example of a subscriber.
+     */
+    private static class FeedReportProvider implements Runnable {
 
         private final LinkedBlockingQueue<String> inbox;
         private final FeedConnectionId feedId;
@@ -322,13 +360,12 @@
         private final List<LinkedBlockingQueue<String>> subscriptionQueues;
         private final Map<String, String> ingestionThroughputs;
 
-        public SFMessageAnalyzer(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
+        public FeedReportProvider(LinkedBlockingQueue<String> inbox, FeedConnectionId feedId)
                 throws UnknownHostException, IOException {
             this.inbox = inbox;
             this.feedId = feedId;
             this.subscriptionQueues = new ArrayList<LinkedBlockingQueue<String>>();
             this.ingestionThroughputs = new HashMap<String, String>();
-            String ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
         }
 
         public void stop() {
@@ -345,170 +382,62 @@
 
         public void run() {
             StringBuilder finalMessage = new StringBuilder();
+            FeedReport report = new FeedReport();
             while (process) {
                 try {
                     String message = inbox.take();
-                    FeedReport report = new FeedReport(message);
+                    report.reset(message);
                     FeedReportMessageType mesgType = report.getReportType();
                     switch (mesgType) {
                         case THROUGHPUT:
                             if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.warning("SuperFeedManager received message " + message);
+                                LOGGER.warning("Feed Health Report " + message);
                             }
-                            for (LinkedBlockingQueue<String> q : subscriptionQueues) {
-                                String[] comp = message.split("\\|");
-                                String parition = comp[3];
-                                String tput = comp[4];
-                                String location = comp[5];
-                                ingestionThroughputs.put(parition, tput);
-                                // throughput in partition order
+                            String[] msgComponents = message.split("\\|");
+                            String partition = msgComponents[3];
+                            String tput = msgComponents[4];
+                            String timestamp = msgComponents[6];
+
+                            boolean dispatchReport = true;
+                            if (ingestionThroughputs.get(partition) == null) {
+                                ingestionThroughputs.put(partition, tput);
+                                dispatchReport = false;
+                            } else {
                                 for (int i = 0; i < ingestionThroughputs.size(); i++) {
                                     String tp = ingestionThroughputs.get(i + "");
-                                    finalMessage.append(tp + "|");
+                                    if (tp != null) {
+                                        ingestionThroughputs.put(i + "", null);
+                                        finalMessage.append(tp + "|");
+                                    } else {
+                                        dispatchReport = false;
+                                        break;
+                                    }
                                 }
-                                q.add(finalMessage.toString());
-                                finalMessage.delete(0, finalMessage.length());
+                                ingestionThroughputs.put(partition, tput);
                             }
+
+                            if (dispatchReport) {
+                                String dispatchedReport = finalMessage.toString();
+                                if (LOGGER.isLoggable(Level.INFO)) {
+                                    LOGGER.info("Dispatched report " + dispatchedReport);
+                                }
+                                for (LinkedBlockingQueue<String> q : subscriptionQueues) {
+                                    q.add(dispatchedReport);
+                                }
+                            }
+                            finalMessage.delete(0, finalMessage.length());
                             break;
                         case CONGESTION:
                             // congestionInbox.add(report);
                             break;
                     }
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning(message);
-                    }
                 } catch (InterruptedException e) {
                     if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Unable to process messages " + e.getMessage());
+                        LOGGER.info("Unable to process messages " + e.getMessage() + " for feed " + feedId);
                     }
                 }
             }
         }
 
-        private static class SuperFeedManagerMessageAnalzer implements IMessageAnalyzer {
-
-            private String ccHost;
-            private CongestionAnalyzer congestionAnalyzer;
-            private LinkedBlockingQueue<FeedReport> congestionInbox = new LinkedBlockingQueue<FeedReport>();
-            private ExecutorService executorService;
-            private LinkedBlockingQueue<String> mesgInbox = new LinkedBlockingQueue<String>();
-
-            public SuperFeedManagerMessageAnalzer(FeedConnectionId feedId) {
-                ccHost = AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp();
-                congestionAnalyzer = new CongestionAnalyzer(congestionInbox);
-                executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
-                executorService.execute(congestionAnalyzer);
-            }
-
-            public void receiveMessage(String message) {
-                Socket socket = null;
-                OutputStream os = null;
-                try {
-                    FeedReport report = new FeedReport(message);
-                    FeedReportMessageType mesgType = report.getReportType();
-                    switch (mesgType) {
-                        case THROUGHPUT:
-                            //send message to FeedHealthDataReceiver at CC (2999)
-                            socket = new Socket(ccHost, 2999);
-                            os = socket.getOutputStream();
-                            os.write(message.getBytes());
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.warning("SuperFeedManager received message " + message);
-                            }
-                            break;
-                        case CONGESTION:
-                            congestionInbox.add(report);
-                            break;
-                    }
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning(message);
-                    }
-                    if (os != null) {
-                        os.close();
-                    }
-                    if (socket != null) {
-                        socket.close();
-                    }
-                } catch (IOException ioe) {
-                    ioe.printStackTrace();
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("unable to send message to FeedHealthDataReceiver");
-                    }
-                }
-            }
-
-            @Override
-            public LinkedBlockingQueue<String> getMessageQueue() {
-                return mesgInbox;
-            }
-
-        }
-
-        private static class CongestionAnalyzer implements Runnable {
-
-            private final LinkedBlockingQueue<FeedReport> inbox;
-            private FeedReport lastMaxReport;
-            private int congestionCount;
-            private long lastReportedCongestion = 0;
-            private long closeEnoughTimeBound = FeedFrameWriter.FLUSH_THRESHOLD_TIME * 2;
-
-            public CongestionAnalyzer(LinkedBlockingQueue<FeedReport> inbox) {
-                this.inbox = inbox;
-            }
-
-            @Override
-            public void run() {
-                FeedReport report;
-                while (true) {
-                    try {
-                        report = inbox.take();
-                        long currentReportedCongestionTime = System.currentTimeMillis();
-                        boolean closeEnough = lastReportedCongestion == 0
-                                || currentReportedCongestionTime - lastReportedCongestion < closeEnoughTimeBound;
-                        if (lastMaxReport == null) {
-                            lastMaxReport = report;
-                            if (closeEnough) {
-                                congestionCount++;
-                            }
-                        } else {
-                            if (report.compareTo(lastMaxReport) > 0) {
-                                lastMaxReport = report;
-                                congestionCount = 1;
-                            } else if (report.compareTo(lastMaxReport) == 0) {
-                                lastMaxReport = report;
-                                if (closeEnough) {
-                                    congestionCount++;
-                                    if (congestionCount > 5) {
-                                        FeedRuntimeType sourceOfCongestion = null;
-                                        switch (lastMaxReport.getRuntimeType()) {
-                                            case INGESTION:
-                                                sourceOfCongestion = FeedRuntimeType.COMPUTE;
-                                                break;
-                                            case COMPUTE:
-                                                sourceOfCongestion = FeedRuntimeType.STORAGE;
-                                                break;
-                                            case STORAGE:
-                                            case COMMIT:
-                                                sourceOfCongestion = FeedRuntimeType.COMMIT;
-                                                break;
-                                        }
-                                        if (LOGGER.isLoggable(Level.WARNING)) {
-                                            LOGGER.warning(" Need elasticity at " + sourceOfCongestion
-                                                    + " as per report " + lastMaxReport);
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                        lastReportedCongestion = System.currentTimeMillis();
-
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
-                }
-
-            }
-
-        }
     }
-}
\ No newline at end of file
+}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
index 703aacb..6d0b1f9 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TweetGenerator2.java
@@ -16,6 +16,10 @@
 
     public static final String KEY_DURATION = "duration";
     public static final String KEY_TPS = "tps";
+    public static final String KEY_MIN_TPS = "tps-min";
+    public static final String KEY_MAX_TPS = "tps-max";
+
+
     public static final String KEY_TPUT_DURATION = "tput-duration";
   
     public static final String OUTPUT_FORMAT = "output-format";
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
index cc1c597..5739c70 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapter.java
@@ -8,6 +8,7 @@
 import java.net.UnknownHostException;
 import java.util.Date;
 import java.util.Map;
+import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
@@ -25,7 +26,6 @@
 /**
  * TPS can be configured between 1 and 20,000
  * 
- * @author ramang
  */
 public class TwitterFirehoseFeedAdapter extends StreamBasedAdapter implements IFeedAdapter {
 
@@ -134,7 +134,9 @@
         private Socket socket;
         private TweetGenerator2 tweetGenerator;
         private boolean continuePush = true;
-        private int tps;
+        private int fixedTps = -1;
+        private int minTps = -1;
+        private int maxTps = -1;
         private int tputDuration;
         private int partition;
         private Rate task;
@@ -144,7 +146,7 @@
 
         public static enum Mode {
             AGGRESSIVE,
-            CONTROLLED
+            CONTROLLED,
         }
 
         public void setPartition(int partition) {
@@ -156,16 +158,42 @@
                 String datasetName) throws Exception {
             this.serverSocket = serverSocket;
             this.tweetGenerator = new TweetGenerator2(configuration, 0, TweetGenerator.OUTPUT_FORMAT_ADM_STRING);
-            tps = Integer.parseInt(configuration.get(TweetGenerator2.KEY_TPS));
-            tputDuration = Integer.parseInt(configuration.get(TweetGenerator2.KEY_TPUT_DURATION));
-            task = new Rate(tweetGenerator, tputDuration, datasetName, partition);
             String value = configuration.get(KEY_MODE);
+            String confValue = null;
             if (value != null) {
                 mode = Mode.valueOf(value.toUpperCase());
+                switch (mode) {
+                    case AGGRESSIVE:
+                        break;
+                    case CONTROLLED:
+                        confValue = configuration.get(TweetGenerator2.KEY_TPS);
+                        if (confValue != null) {
+                            minTps = Integer.parseInt(confValue);
+                            maxTps = minTps;
+                            fixedTps = minTps;
+                        } else {
+                            confValue = configuration.get(TweetGenerator2.KEY_MIN_TPS);
+                            if (confValue != null) {
+                                minTps = Integer.parseInt(confValue);
+                            }
+                            confValue = configuration.get(TweetGenerator2.KEY_MAX_TPS);
+                            if (confValue != null) {
+                                maxTps = Integer.parseInt(configuration.get(TweetGenerator2.KEY_MAX_TPS));
+                            }
+
+                            if (minTps < 0 || maxTps < 0 || minTps > maxTps) {
+                                throw new IllegalArgumentException("Incorrect value for min/max TPS");
+                            }
+                        }
+
+                }
             } else {
                 mode = Mode.AGGRESSIVE;
             }
 
+            tputDuration = Integer.parseInt(configuration.get(TweetGenerator2.KEY_TPUT_DURATION));
+            task = new Rate(tweetGenerator, tputDuration, datasetName, partition);
+
         }
 
         @Override
@@ -180,7 +208,14 @@
                     timer.schedule(task, tputDuration * 1000, tputDuration * 1000);
                     long startBatch;
                     long endBatch;
+                    Random random = new Random();
+                    int tps = 0;
                     while (moreData && continuePush) {
+                        if(maxTps > 0){
+                             tps = minTps + random.nextInt((maxTps+1) - minTps);   
+                        } else {
+                            tps = fixedTps;
+                        }
                         startBatch = System.currentTimeMillis();
                         moreData = tweetGenerator.setNextRecordBatch(tps);
                         endBatch = System.currentTimeMillis();
@@ -253,9 +288,7 @@
             public void setPartition(int partition) {
                 this.partition = partition;
             }
-
         }
-
     }
 
     @Override
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
index 241ea6c..1d8a607 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/TwitterFirehoseFeedAdapterFactory.java
@@ -14,7 +14,10 @@
  */
 package edu.uci.ics.asterix.tools.external.data;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.external.adapter.factory.StreamBasedAdapterFactory;
@@ -24,6 +27,8 @@
 import edu.uci.ics.asterix.om.types.AUnorderedListType;
 import edu.uci.ics.asterix.om.types.BuiltinType;
 import edu.uci.ics.asterix.om.types.IAType;
+import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -46,9 +51,15 @@
 
     /*
      * Degree of parallelism for feed ingestion activity. Defaults to 1.
+     * This builds up the count constraint for the ingestion operator.
      */
     private static final String KEY_INGESTION_CARDINALITY = "ingestion-cardinality";
 
+    /*
+     * The absolute locations where ingestion operator instances will be places. 
+     */
+    private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
+
     private static final ARecordType outputType = initOutputType();
 
     @Override
@@ -76,8 +87,23 @@
     @Override
     public AlgebricksPartitionConstraint getPartitionConstraint() throws Exception {
         String ingestionCardinalityParam = (String) configuration.get(KEY_INGESTION_CARDINALITY);
-        int requiredCardinality = ingestionCardinalityParam != null ? Integer.parseInt(ingestionCardinalityParam) : 1;
-        return new AlgebricksCountPartitionConstraint(requiredCardinality);
+        String ingestionLocationParam = (String) configuration.get(KEY_INGESTION_LOCATIONS);
+        String[] locations = null;
+        if (ingestionLocationParam != null) {
+            locations = ingestionLocationParam.split(",");
+        }
+        int count = locations != null ? locations.length : 1;
+        if (ingestionCardinalityParam != null) {
+            count = Integer.parseInt(ingestionCardinalityParam);
+        }
+
+        List<String> chosenLocations = new ArrayList<String>();
+        String[] availableLocations = locations != null ? locations : AsterixClusterProperties.INSTANCE
+                .getParticipantNodes().toArray(new String[] {});
+        for (int i = 0, k = 0; i < count; i++, k = (k + 1) % availableLocations.length) {
+            chosenLocations.add(availableLocations[k]);
+        }
+        return new AlgebricksAbsolutePartitionConstraint(chosenLocations.toArray(new String[] {}));
     }
 
     @Override