1) minor fix in feed console 2) fixed bug incorrect evaluation of feed adapter output type
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
index a0cf4f9..ed97117 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -30,10 +30,13 @@
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
 import edu.uci.ics.asterix.om.base.AString;
 import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
+import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -132,7 +135,7 @@
 
                 String feedArg = getStringArgument(f, 0);
                 String outputType = getStringArgument(f, 1);
-                String targetDataset = getStringArgument(f, 2);
+                String targetDataset = getStringArgument(f, 1);
 
                 AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
                 Pair<String, String> feedReference = parseDatasetReference(metadataProvider, feedArg);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
index 3721147..71d57d7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/http/servlet/FeedDataProviderServlet.java
@@ -16,6 +16,8 @@
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -28,6 +30,11 @@
 import org.json.JSONObject;
 
 import edu.uci.ics.asterix.hyracks.bootstrap.FeedLifecycleListener;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityDetails;
+import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
 
 public class FeedDataProviderServlet extends HttpServlet {
@@ -41,16 +48,22 @@
         String dataverseName = request.getParameter("dataverse");
 
         String report = getFeedReport(feedName, datasetName, dataverseName);
-        System.out.println(" RECEIVED REPORT " + report);
+        System.out.println(" REPORT " + report);
         long timestamp = System.currentTimeMillis();
-        
-        JSONObject obj = new JSONObject();
-        JSONArray array = new JSONArray();
-        try {
-            obj.put("time", timestamp);
-            obj.put("value", report);
-        } catch (JSONException jsoe) {
-            throw new IOException(jsoe);
+        JSONObject obj = null;
+        if (report != null) {
+            JSONArray array = new JSONArray();
+            try {
+                obj = new JSONObject();
+                obj.put("type", "report");
+                obj.put("time", timestamp);
+                obj.put("value", report);
+            } catch (JSONException jsoe) {
+                throw new IOException(jsoe);
+            }
+        } else {
+            obj = verifyIfFeedIsAlive(dataverseName, feedName, datasetName);
+            // do null check
         }
 
         PrintWriter out = response.getWriter();
@@ -68,4 +81,42 @@
         }
         return report;
     }
+
+    private JSONObject verifyIfFeedIsAlive(String dataverseName, String feedName, String datasetName) {
+        JSONObject obj = null;
+        try {
+            MetadataTransactionContext ctx = MetadataManager.INSTANCE.beginTransaction();
+            List<FeedActivity> feedActivities = MetadataManager.INSTANCE
+                    .getActiveFeeds(ctx, dataverseName, datasetName);
+            boolean active = false;
+            FeedActivity activity = null;
+            for (FeedActivity feedActivity : feedActivities) {
+                active = (feedActivity.getFeedName().equals(feedName) && feedActivity.getActivityType().equals(
+                        FeedActivityType.FEED_BEGIN));
+                if (active) {
+                    activity = feedActivity;
+                    break;
+                }
+            }
+            if (active) {
+                Map<String, String> activityDetails = activity.getFeedActivityDetails();
+                String ingestLocations = activityDetails.get(FeedActivityDetails.INGEST_LOCATIONS);
+                String computeLocations = activityDetails.get(FeedActivityDetails.COMPUTE_LOCATIONS);
+                String storageLocations = activityDetails.get(FeedActivityDetails.STORAGE_LOCATIONS);
+                obj = new JSONObject();
+                obj.put("type", "reload");
+                obj.put("ingestLocations", ingestLocations);
+                obj.put("computeLocations", computeLocations);
+                obj.put("storageLocations", storageLocations);
+                System.out.println(" RE LOADING " + " ingestion at " + ingestLocations + " compute at "
+                        + computeLocations + " storage at " + storageLocations);
+            } else {
+                obj = null;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return obj;
+
+    }
 }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 6b79958..94eefb6 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -73,7 +73,6 @@
 import edu.uci.ics.asterix.file.DatasetOperations;
 import edu.uci.ics.asterix.file.DataverseOperations;
 import edu.uci.ics.asterix.file.FeedOperations;
-import edu.uci.ics.asterix.file.FeedUtil;
 import edu.uci.ics.asterix.file.IndexOperations;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.metadata.IDatasetDetails;
@@ -98,6 +97,7 @@
 import edu.uci.ics.asterix.metadata.entities.NodeGroup;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
 import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.ATypeTag;
 import edu.uci.ics.asterix.om.types.IAType;
diff --git a/asterix-app/src/main/resources/feed/dashboard.html b/asterix-app/src/main/resources/feed/dashboard.html
index aa25bd8..365f2f5 100644
--- a/asterix-app/src/main/resources/feed/dashboard.html
+++ b/asterix-app/src/main/resources/feed/dashboard.html
@@ -36,6 +36,13 @@
 
          
         function onFeedReportReceived(data) {
+            var type = data["type"];
+            if (type == ("reload")) {
+              ingestLocations  = data["ingestLocations"];
+              computeLocations = data["computeLocations"];
+              storageLocations = data["storageLocations"];
+              document.location.reload(true);
+            } else {
             var report = data["value"];
             var tputArray = report.split("|");
             var covered = 0;
@@ -49,6 +56,7 @@
                 ingestionTimeSeries[j].append(data["time"], 0);
             }          
             ingestionTimeSeries[numIngestionNodes].append(data["time"], totalTput);
+            }
         }
 
         function myYRangeFunction(range) {
@@ -60,7 +68,7 @@
         function initTimeline(ingestLocations) {
 
           document.write("<i>" + "Feed Ingestion" + "<i>");
-          document.write("<br />" + "Ingestion Locations: " + ingestLocations);
+          document.write("<br />" + "Ingestion Locations: " + computeLocations);
           document.write("<br />" + "Compute Locations: " + computeLocations);
           document.write("<br />" + "Storage Locations: " + storageLocations);
           document.write("<br />" + "Ingestion Policy: " + ingestionPolicy);
@@ -69,7 +77,7 @@
           document.write("<br />");
     
           for( var i = 0; i < numIngestionNodes; i++){
-              graphNames[i] = ingestionNodes[i];
+              graphNames[i] = "Partition " + i;
           }          
 
           if(numIngestionNodes > 1){
diff --git a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
index 3ee53a6..ec8b6be 100644
--- a/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
+++ b/asterix-aql/src/main/java/edu/uci/ics/asterix/aql/expression/ConnectFeedStatement.java
@@ -32,6 +32,10 @@
 import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.Function;
 import edu.uci.ics.asterix.metadata.feeds.BuiltinFeedPolicies;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
+import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
+import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 
 public class ConnectFeedStatement implements Statement {
@@ -74,41 +78,41 @@
         query = new Query();
         FunctionSignature appliedFunction = sourceFeed.getAppliedFunction();
         Function function = null;
-        String adaptorOutputType = null;
+        String adapterOutputType = null;
         if (appliedFunction != null) {
             function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction);
             if (function == null) {
                 throw new MetadataException(" Unknown function " + function);
-            } else {
-                if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
-                    adaptorOutputType = targetDataset.getItemTypeName();
-                } else {
-                    if (function.getParams().size() > 1) {
-                        throw new MetadataException(" Incompatible function: " + appliedFunction
-                                + " Number if arguments must be 1");
-                    }
-                    adaptorOutputType = function.getParams().get(0);
-                }
+            } else if (function.getParams().size() > 1) {
+                throw new MetadataException(" Incompatible function: " + appliedFunction
+                        + " Number if arguments must be 1");
             }
-        } else {
-            adaptorOutputType = targetDataset.getItemTypeName();
         }
+
+        org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
+        try {
+            factoryOutput = FeedUtil.getFeedFactoryAndOutput(sourceFeed, mdTxnCtx);
+            adapterOutputType = factoryOutput.getRight().getTypeName();
+        } catch (AlgebricksException ae) {
+            throw new MetadataException(ae);
+        }
+
         StringBuilder builder = new StringBuilder();
         builder.append("set" + " " + FunctionUtils.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
         builder.append("insert into dataset " + datasetName + " ");
 
         if (appliedFunction == null) {
-            builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adaptorOutputType + "'"
+            builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adapterOutputType + "'"
                     + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
             builder.append(" return $x");
         } else {
             if (function.getLanguage().equalsIgnoreCase(Function.LANGUAGE_AQL)) {
                 String param = function.getParams().get(0);
                 builder.append(" (" + " for" + " " + param + " in feed-ingest ('" + feedName + "'" + "," + "'"
-                        + adaptorOutputType + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
+                        + adapterOutputType + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
                 builder.append(" let $y:=(" + function.getFunctionBody() + ")" + " return $y");
             } else {
-                builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adaptorOutputType
+                builder.append(" (" + " for $x in feed-ingest ('" + feedName + "'" + "," + "'" + adapterOutputType
                         + "'" + "," + "'" + targetDataset.getDatasetName() + "'" + ")");
                 builder.append(" let $y:=" + sourceFeed.getDataverseName() + "." + function.getName() + "(" + "$x"
                         + ")");
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 7dd2c3f..8a7f6e1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -1397,7 +1397,7 @@
                 switch (fa.getActivityType()) {
                     case FEED_BEGIN:
                         if (!terminatedFeeds.contains(fid)) {
-                            if (aFeeds.get(fid) == null) {
+                            if (aFeeds.get(fid) == null || fa.getActivityId() > aFeeds.get(fid).getActivityId()) {
                                 aFeeds.put(fid, fa);
                             }
                         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 89368e7..5af996f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -19,10 +19,8 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
@@ -65,10 +63,10 @@
 import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
 import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedMessageOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.feeds.FeedUtil;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.IAdapterFactory.SupportedOperation;
 import edu.uci.ics.asterix.metadata.feeds.IFeedMessage;
-import edu.uci.ics.asterix.metadata.feeds.IFeedMessage.MessageType;
 import edu.uci.ics.asterix.metadata.feeds.IGenericAdapterFactory;
 import edu.uci.ics.asterix.metadata.feeds.ITypedAdapterFactory;
 import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
@@ -169,7 +167,7 @@
 
     private final AsterixStorageProperties storageProperties;
 
-    private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
+    public static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
 
     public String getPropertyValue(String propertyName) {
         return config.get(propertyName);
@@ -412,65 +410,30 @@
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(JobSpecification jobSpec,
             IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
 
-        DatasourceAdapter adapterEntity;
-        IAdapterFactory adapterFactory;
-        IAType adapterOutputType;
-        String adapterName;
-        String adapterFactoryClassname;
         FeedDataSource feedDataSource = (FeedDataSource) dataSource;
-        try {
-            adapterName = feedDataSource.getFeed().getAdaptorName();
-            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
-                    adapterName);
-            if (adapterEntity != null) {
-                adapterFactoryClassname = adapterEntity.getClassname();
-                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-            } else {
-                adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
-                if (adapterFactoryClassname != null) {
-                } else {
-                    adapterFactoryClassname = adapterName;
-                }
-                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
-            }
-
-            Map<String, String> configuration = feedDataSource.getFeed().getAdaptorConfiguration();
-
-            switch (adapterFactory.getAdapterType()) {
-                case TYPED:
-                    adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
-                    ((ITypedAdapterFactory) adapterFactory).configure(configuration);
-                    break;
-                case GENERIC:
-                    String outputTypeName = configuration.get("output-type-name");
-                    adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
-                            feedDataSource.getDatasourceDataverse(), outputTypeName).getDatatype();
-                    ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
-                    break;
-                default:
-                    throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new AlgebricksException("unable to create adapter  " + e);
-        }
-
-        ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
-                .getSerializerDeserializer(adapterOutputType);
-        RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
-
-        FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
-                BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
-
-        feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
-        FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
-                feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
-                        .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
-                feedDesc, feedPolicy.getProperties());
-
+        FeedIntakeOperatorDescriptor feedIngestor = null;
+        org.apache.commons.lang3.tuple.Pair<IAdapterFactory, ARecordType> factoryOutput = null;
         AlgebricksPartitionConstraint constraint = null;
+
         try {
-            constraint = adapterFactory.getPartitionConstraint();
+            factoryOutput = FeedUtil.getFeedFactoryAndOutput(feedDataSource.getFeed(), mdTxnCtx);
+            IAdapterFactory adapterFactory = factoryOutput.getLeft();
+            ARecordType adapterOutputType = factoryOutput.getRight();
+
+            ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
+                    .getSerializerDeserializer(adapterOutputType);
+            RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+
+            FeedPolicy feedPolicy = (FeedPolicy) ((AqlDataSource) dataSource).getProperties().get(
+                    BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+
+            feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy.getPolicyName());
+            feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
+                    feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
+                            .getFeedConnectionId().getDatasetName()), adapterFactory, (ARecordType) adapterOutputType,
+                    feedDesc, feedPolicy.getProperties());
+
+            constraint = factoryOutput.getLeft().getPartitionConstraint();
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index 698e2ab..c06f884 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -6,8 +6,6 @@
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -27,32 +25,18 @@
 
     private IOperatorNodePushable nodePushable;
 
-    private FeedPolicyEnforcer policyEnforcer;
-
-    private FeedConnectionId feedId;
-
-    private LinkedBlockingQueue<Long> statsOutbox;
-
     private final boolean collectStatistics;
 
     private List<ByteBuffer> frames = new ArrayList<ByteBuffer>();
 
     private Mode mode;
 
-    private String nodeId;
-
     public static final long FLUSH_THRESHOLD_TIME = 5000;
 
     private FramePushWait framePushWait;
 
-    private FeedRuntimeType feedRuntimeType;
-
-    private int partition;
-
     private Timer timer;
 
-    private ExecutorService executorService;
-
     private FrameTupleAccessor fta;
 
     public enum Mode {
@@ -66,14 +50,8 @@
         this.writer = writer;
         this.mode = Mode.FORWARD;
         this.nodePushable = nodePushable;
-        this.feedId = feedId;
-        this.policyEnforcer = policyEnforcer;
-        this.feedRuntimeType = feedRuntimeType;
-        this.partition = partition;
-        this.executorService = FeedManager.INSTANCE.getFeedExecutorService(feedId);
         this.collectStatistics = policyEnforcer.getFeedPolicyAccessor().collectStatistics();
         if (collectStatistics) {
-            this.statsOutbox = new LinkedBlockingQueue<Long>();
             timer = new Timer();
             framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, feedId, nodeId, feedRuntimeType,
                     partition, FLUSH_THRESHOLD_TIME, timer);
@@ -94,19 +72,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Switching to :" + newMode + " from " + this.mode);
         }
-        switch (newMode) {
-            case FORWARD:
-                this.mode = newMode;
-                break;
-            case STORE:
-                this.mode = newMode;
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Beginning to store frames :");
-                    LOGGER.info("Frames accumulated till now:" + frames.size());
-                }
-                break;
-        }
-
+        this.mode = newMode;
     }
 
     public List<ByteBuffer> getStoredFrames() {
@@ -201,6 +167,7 @@
 
         public void reset() {
             mesgService = null;
+            collectThroughput = true;
         }
 
         public void notifyFinish(int numTuples) {
@@ -220,7 +187,8 @@
                 }
             }
             if (collectThroughput) {
-                System.out.println(" NUMBER of TUPLES " + numTuplesInInterval.get() + " in  " + period);
+                System.out.println(" NUMBER of TUPLES " + numTuplesInInterval.get() + " in  " + period
+                        + " for partition " + partition);
                 int instantTput = (int) Math.ceil((((double) numTuplesInInterval.get() * 1000) / period));
                 sendReportToSFM(instantTput, FeedReportMessageType.THROUGHPUT);
             }
@@ -247,6 +215,7 @@
             try {
                 mesgService.sendMessage(message);
             } catch (IOException ioe) {
+                ioe.printStackTrace();
                 if (LOGGER.isLoggable(Level.WARNING)) {
                     LOGGER.warning("Unable to send feed report to SFM for feed " + feedId + " " + feedRuntimeType + "["
                             + partition + "]");
@@ -254,6 +223,14 @@
             }
         }
 
+        public void deactivate() {
+            collectThroughput = false;
+        }
+
+        public void activate() {
+            collectThroughput = true;
+        }
+
         private enum State {
             INTIALIZED,
             WAITING_FOR_FLUSH_COMPLETION,
@@ -262,99 +239,23 @@
 
     }
 
-    private static class FeedOperatorStatisticsCollector implements Runnable {
-
-        private final FeedConnectionId feedId;
-        private final LinkedBlockingQueue<Long> inbox;
-        private final long[] readings;
-        private int readingIndex = 0;
-        private int historySize = 10;
-        private double runningAvg = -1;
-        private double deviationPercentageThreshold = 50;
-        private int successiveThresholds = 0;
-        private IOperatorNodePushable coreOperatorNodePushable;
-        private int count;
-
-        public FeedOperatorStatisticsCollector(FeedConnectionId feedId, LinkedBlockingQueue<Long> inbox,
-                IOperatorNodePushable coreOperatorNodePushable) {
-            this.feedId = feedId;
-            this.inbox = inbox;
-            this.readings = new long[historySize];
-            this.coreOperatorNodePushable = coreOperatorNodePushable;
-        }
-
-        @Override
-        public void run() {
-            SuperFeedManager sfm = null;
-            try {
-                while (sfm == null) {
-                    sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
-                    if (sfm == null) {
-                        Thread.sleep(2000);
-                    }
-                }
-
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Obtained SFM " + sfm + " " + coreOperatorNodePushable.getDisplayName());
-                }
-                while (true) {
-                    Long reading = inbox.take();
-                    if (count != historySize) {
-                        count++;
-                    }
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Obtained Reading " + reading + " " + coreOperatorNodePushable.getDisplayName());
-                    }
-                    double newRunningAvg;
-                    double deviation = 0;
-                    if (runningAvg >= 0) {
-                        int prevIndex = readingIndex == 0 ? historySize - 1 : readingIndex - 1;
-                        newRunningAvg = (runningAvg * count - readings[prevIndex] + reading) / (count);
-                        deviation = reading - runningAvg;
-                    } else {
-                        newRunningAvg = reading;
-                    }
-
-                    double devPercentage = (deviation * 100 / runningAvg);
-
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Current reading :" + reading + " Previous avg:" + runningAvg + " New Average: "
-                                + newRunningAvg + " deviation % " + devPercentage + " Op "
-                                + coreOperatorNodePushable.getDisplayName());
-                    }
-
-                    if (devPercentage > deviationPercentageThreshold) {
-                        successiveThresholds++;
-                        if (successiveThresholds > 1) {
-                            if (LOGGER.isLoggable(Level.SEVERE)) {
-                                LOGGER.severe("CONGESTION in sending frames by "
-                                        + coreOperatorNodePushable.getDisplayName());
-                            }
-                            successiveThresholds = 0;
-                        }
-                    } else {
-                        runningAvg = newRunningAvg;
-                        readings[readingIndex] = reading;
-                        readingIndex = (readingIndex + 1) % historySize;
-                    }
-                }
-            } catch (InterruptedException ie) {
-                // do nothing
-            }
-        }
-    }
-
     @Override
     public void fail() throws HyracksDataException {
         writer.fail();
-        if (framePushWait != null) {
+        if (framePushWait != null && !framePushWait.feedRuntimeType.equals(FeedRuntimeType.INGESTION)) {
             framePushWait.cancel();
+            framePushWait.deactivate();
         }
+        framePushWait.reset();
     }
 
     @Override
     public void close() throws HyracksDataException {
         if (framePushWait != null) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Closing frame statistics collection activity" + framePushWait);
+            }
+            framePushWait.deactivate();
             framePushWait.cancel();
         }
         writer.close();
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index ca73324..5ed4ccc 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.asterix.metadata.feeds;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
@@ -21,6 +22,7 @@
 
 import edu.uci.ics.asterix.metadata.feeds.AdapterRuntimeManager.State;
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -42,6 +44,7 @@
     private FeedRuntime ingestionRuntime;
     private final String nodeId;
     private FrameTupleAccessor fta;
+    private FeedFrameWriter feedFrameWriter;
 
     public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedAdapter adapter,
             Map<String, String> feedPolicy, int partition, IngestionRuntime ingestionRuntime) {
@@ -63,9 +66,9 @@
         AdapterRuntimeManager adapterRuntimeMgr = null;
         try {
             if (ingestionRuntime == null) {
-                FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
+                feedFrameWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
                         FeedRuntimeType.INGESTION, partition, fta);
-                adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, mWriter, partition, inbox);
+                adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, feedFrameWriter, partition, inbox);
 
                 if (adapter instanceof AbstractFeedDatasourceAdapter) {
                     ((AbstractFeedDatasourceAdapter) adapter).setFeedPolicyEnforcer(policyEnforcer);
@@ -73,7 +76,7 @@
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Beginning new feed:" + feedId);
                 }
-                mWriter.open();
+                feedFrameWriter.open();
                 adapterRuntimeMgr.start();
             } else {
                 adapterRuntimeMgr = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager();
@@ -85,6 +88,7 @@
                 adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
                 adapterRuntimeMgr.getAdapterExecutor().getWriter().reset();
                 adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
+                feedFrameWriter = adapterRuntimeMgr.getAdapterExecutor().getWriter();
             }
 
             ingestionRuntime = adapterRuntimeMgr.getIngestionRuntime();
@@ -94,22 +98,30 @@
                 }
             }
             FeedManager.INSTANCE.deRegisterFeedRuntime(ingestionRuntime.getFeedRuntimeId());
+            feedFrameWriter.close();
         } catch (InterruptedException ie) {
             if (policyEnforcer.getFeedPolicyAccessor().continueOnHardwareFailure()) {
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Continuing on failure as per feed policy");
                 }
                 adapterRuntimeMgr.setState(State.INACTIVE_INGESTION);
-                writer.fail();
+                FeedRuntimeManager runtimeMgr = FeedManager.INSTANCE.getFeedRuntimeManager(feedId);
+                try {
+                    runtimeMgr.close(false);
+                } catch (IOException ioe) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning("Unable to close Feed Runtime Manager " + ioe.getMessage());
+                    }
+                }
+                feedFrameWriter.fail();
             } else {
                 FeedManager.INSTANCE.deRegisterFeedRuntime(ingestionRuntime.getFeedRuntimeId());
+                feedFrameWriter.close();
                 throw new HyracksDataException(ie);
             }
         } catch (Exception e) {
             e.printStackTrace();
             throw new HyracksDataException(e);
-        } finally {
-            writer.close();
         }
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
index 83060f2..d910ca8 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedManager.java
@@ -62,7 +62,7 @@
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("Closing feed runtime manager: " + mgr);
                 }
-                mgr.close();
+                mgr.close(true);
             }
         } catch (Exception e) {
             if (LOGGER.isLoggable(Level.WARNING)) {
@@ -98,6 +98,9 @@
         FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedRuntimeId.getFeedId());
         if (runtimeMgr != null) {
             runtimeMgr.deregisterFeedRuntime(feedRuntimeId);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Deregistered Feed Runtime " + feedRuntimeId);
+            }
         }
     }
 
@@ -112,6 +115,9 @@
         FeedRuntimeManager runtimeMgr = feedRuntimeManagers.get(feedId);
         if (runtimeMgr != null) {
             runtimeMgr.setSuperFeedManager(sfm);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("Registered Super Feed Manager " + sfm);
+            }
         }
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
index 8838ad4..a8541f5 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageOperatorNodePushable.java
@@ -17,7 +17,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeId;
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
index c93bb5a..2e31f6d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMessageService.java
@@ -61,6 +61,9 @@
                         LOGGER.warning("Unable to start feed message service for " + feedId);
                     }
                 }
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("ENDED FEED MESSAGE SERVICE for " + feedId);
+                }
             } catch (Exception e) {
                 if (LOGGER.isLoggable(Level.WARNING)) {
                     LOGGER.warning("Exception in handling incoming feed messages" + e.getMessage());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
index 3b2eaaf..b4951ec 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedRuntimeManager.java
@@ -38,7 +38,7 @@
         feedReportQueue = new LinkedBlockingQueue<String>();
     }
 
-    public void close() throws IOException {
+    public void close(boolean closeAll) throws IOException {
         socketFactory.close();
 
         if (messageService != null) {
@@ -46,6 +46,7 @@
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Shut down message service s for :" + feedId);
             }
+            messageService = null;
         }
         if (superFeedManager != null && superFeedManager.isLocal()) {
             superFeedManager.stop();
@@ -54,10 +55,12 @@
             }
         }
 
-        if (executorService != null) {
-            executorService.shutdownNow();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shut down executor service for :" + feedId);
+        if (closeAll) {
+            if (executorService != null) {
+                executorService.shutdownNow();
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Shut down executor service for :" + feedId);
+                }
             }
         }
     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
similarity index 74%
rename from asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
rename to asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
index a09ca3c..e6bb81d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedUtil.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedUtil.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.file;
+package edu.uci.ics.asterix.metadata.feeds;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -11,14 +11,20 @@
 import org.apache.commons.lang3.tuple.Pair;
 
 import edu.uci.ics.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import edu.uci.ics.asterix.metadata.MetadataManager;
+import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
+import edu.uci.ics.asterix.metadata.entities.Feed;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity;
 import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
 import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
-import edu.uci.ics.asterix.metadata.feeds.FeedConnectionId;
-import edu.uci.ics.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
-import edu.uci.ics.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
 import edu.uci.ics.asterix.om.types.ARecordType;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -171,4 +177,53 @@
 
     }
 
+    public static Pair<IAdapterFactory, ARecordType> getFeedFactoryAndOutput(Feed feed,
+            MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
+
+        String adapterName = null;
+        DatasourceAdapter adapterEntity = null;
+        String adapterFactoryClassname = null;
+        IAdapterFactory adapterFactory = null;
+        ARecordType adapterOutputType = null;
+        Pair<IAdapterFactory, ARecordType> feedProps = null;
+        try {
+            adapterName = feed.getAdaptorName();
+            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
+                    adapterName);
+            if (adapterEntity != null) {
+                adapterFactoryClassname = adapterEntity.getClassname();
+                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            } else {
+                adapterFactoryClassname = AqlMetadataProvider.adapterFactoryMapping.get(adapterName);
+                if (adapterFactoryClassname != null) {
+                } else {
+                    adapterFactoryClassname = adapterName;
+                }
+                adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+            }
+
+            Map<String, String> configuration = feed.getAdaptorConfiguration();
+
+            switch (adapterFactory.getAdapterType()) {
+                case TYPED:
+                    adapterOutputType = ((ITypedAdapterFactory) adapterFactory).getAdapterOutputType();
+                    ((ITypedAdapterFactory) adapterFactory).configure(configuration);
+                    break;
+                case GENERIC:
+                    String outputTypeName = configuration.get("output-type-name");
+                    adapterOutputType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+                            feed.getDataverseName(), outputTypeName).getDatatype();
+                    ((IGenericAdapterFactory) adapterFactory).configure(configuration, (ARecordType) adapterOutputType);
+                    break;
+                default:
+                    throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
+            }
+
+            feedProps = Pair.of(adapterFactory, adapterOutputType);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("unable to create adapter  " + e);
+        }
+        return feedProps;
+    }
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
index 3be39f3..20a6b0e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/MessageListener.java
@@ -28,11 +28,11 @@
     }
 
     public void stop() {
+        listenerServer.stop();
+        System.out.println("STOPPED MESSAGE RECEIVING SERVICE AT " + port);
         if (!executorService.isShutdown()) {
             executorService.shutdownNow();
         }
-        listenerServer.stop();
-        System.out.println("STOPPED MESSAGE RECEIVING SERVICE AT " + port);
 
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index 4d15b8e..18177c2 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -120,6 +120,11 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Stopped super feed manager! " + this);
         }
+        started = false;
+    }
+
+    public boolean isStarted() {
+        return started;
     }
 
     @Override
@@ -212,6 +217,9 @@
                     String message = inbox.take();
                     os.write((message + EOM).getBytes());
                 }
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("Unsubscribed from " + feedId + " disconnected");
+                }
             } catch (IOException e) {
                 e.printStackTrace();
             } catch (InterruptedException e) {
@@ -265,6 +273,9 @@
                     e.printStackTrace();
                 }
             }
+            for (MessageListener listener : messageListeners) {
+                listener.stop();
+            }
             mesgAnalyzer.stop();
         }
 
@@ -348,8 +359,11 @@
                                 String[] comp = message.split("\\|");
                                 String parition = comp[3];
                                 String tput = comp[4];
+                                String location = comp[5];
                                 ingestionThroughputs.put(parition, tput);
-                                for (String tp : ingestionThroughputs.values()) {
+                                // throughput in partition order
+                                for (int i = 0; i < ingestionThroughputs.size(); i++) {
+                                    String tp = ingestionThroughputs.get(i + "");
                                     finalMessage.append(tp + "|");
                                 }
                                 q.add(finalMessage.toString());