fixed issue related to retrieval of information for active feeds post cluster shutdown
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index ade794e..b806be0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -18,8 +18,10 @@
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
@@ -84,7 +86,6 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -1369,6 +1370,7 @@
     public List<FeedActivity> getActiveFeeds(JobId jobId, String dataverse, String dataset) throws MetadataException,
             RemoteException {
         List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>();
+        Map<FeedConnectionId, FeedActivity> aFeeds = new HashMap<FeedConnectionId, FeedActivity>();
         try {
             ITupleReference searchKey = createTuple();
             FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
@@ -1389,14 +1391,21 @@
                     case FEED_RESUME:
                     case FEED_BEGIN:
                         if (!terminatedFeeds.contains(fid)) {
-                            activeFeeds.add(fa);
+                            if (aFeeds.get(fid) == null) {
+                                aFeeds.put(fid, fa);
+                            }
                         }
                         break;
                     case FEED_END:
                         terminatedFeeds.add(fid);
                         break;
+                    default: //ignore    
                 }
             }
+            for (FeedActivity f : aFeeds.values()) {
+                System.out.println("ACTIVE FEEDS " + f.getFeedName());
+                activeFeeds.add(f);
+            }
             return activeFeeds;
         } catch (Exception e) {
             throw new MetadataException(e);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
index c4f84e5..639d5d1 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/FeedActivity.java
@@ -98,16 +98,36 @@
         if (!(other instanceof FeedActivity)) {
             return false;
         }
-        FeedActivity otherDataset = (FeedActivity) other;
-        if (!otherDataset.dataverseName.equals(dataverseName)) {
+
+        if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
             return false;
         }
-        if (!otherDataset.datasetName.equals(datasetName)) {
+        if (!((FeedActivity) other).datasetName.equals(datasetName)) {
             return false;
         }
+        if (!((FeedActivity) other).getFeedName().equals(feedName)) {
+            return false;
+        }
+        if (!((FeedActivity) other).getFeedActivityType().equals(activityType)) {
+            return false;
+        }
+        if (((FeedActivity) other).getActivityId() != (activityId)) {
+            return false;
+        }
+
         return true;
     }
 
+    @Override
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return dataverseName + "." + feedName + " --> " + datasetName + " " + activityType + " " + activityId;
+    }
+
     public FeedActivityType getFeedActivityType() {
         return activityType;
     }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
index c84560a..3a481f3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedFrameWriter.java
@@ -9,13 +9,16 @@
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
+import edu.uci.ics.asterix.metadata.feeds.SuperFeedManager.FeedReportMessageType;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 
 public class FeedFrameWriter implements IFrameWriter {
 
@@ -53,6 +56,8 @@
 
     private ExecutorService executorService;
 
+    private FrameTupleAccessor fta;
+
     public enum Mode {
         FORWARD,
         STORE
@@ -60,7 +65,7 @@
 
     public FeedFrameWriter(IFrameWriter writer, IOperatorNodePushable nodePushable, FeedConnectionId feedId,
             FeedPolicyEnforcer policyEnforcer, String nodeId, FeedRuntimeType feedRuntimeType, int partition,
-            ExecutorService executorService) {
+            ExecutorService executorService, FrameTupleAccessor fta) {
         this.writer = writer;
         this.mode = Mode.FORWARD;
         this.nodePushable = nodePushable;
@@ -76,11 +81,11 @@
             executorService.execute(task);
             sfm = FeedManager.INSTANCE.getSuperFeedManager(feedId);
             framePushWait = new FramePushWait(nodePushable, FLUSH_THRESHOLD_TIME, sfm, feedId, nodeId, feedRuntimeType,
-                    partition);
+                    partition, FLUSH_THRESHOLD_TIME);
             Timer timer = new Timer();
             timer.scheduleAtFixedRate(framePushWait, 0, FLUSH_THRESHOLD_TIME);
         }
-
+        this.fta = fta;
     }
 
     public Mode getMode() {
@@ -128,9 +133,10 @@
             case FORWARD:
                 try {
                     if (collectStatistics) {
+                        fta.reset(buffer);
                         framePushWait.notifyStart();
                         writer.nextFrame(buffer);
-                        framePushWait.notifyFinish();
+                        framePushWait.notifyFinish(fta.getTupleCount());
                     } else {
                         writer.nextFrame(buffer);
                     }
@@ -171,9 +177,12 @@
         private String nodeId;
         private FeedRuntimeType feedRuntimeType;
         private int partition;
+        private AtomicLong numTuplesInInterval = new AtomicLong(0);
+        private long period;
+        private boolean collectThroughput;
 
         public FramePushWait(IOperatorNodePushable nodePushable, long flushThresholdTime, SuperFeedManager sfm,
-                FeedConnectionId feedId, String nodeId, FeedRuntimeType feedRuntimeType, int partition) {
+                FeedConnectionId feedId, String nodeId, FeedRuntimeType feedRuntimeType, int partition, long period) {
             this.nodePushable = nodePushable;
             this.flushThresholdTime = flushThresholdTime;
             this.state = State.INTIALIZED;
@@ -182,15 +191,19 @@
             this.nodeId = nodeId;
             this.feedRuntimeType = feedRuntimeType;
             this.partition = partition;
+            this.period = period;
+            this.collectThroughput = feedRuntimeType.equals(FeedRuntimeType.INGESTION);
         }
 
         public void notifyStart() {
             startTime = System.currentTimeMillis();
             state = State.WAITING_FOR_FLUSH_COMPLETION;
+
         }
 
-        public void notifyFinish() {
+        public void notifyFinish(int numTuples) {
             state = State.WAITNG_FOR_NEXT_FRAME;
+            numTuplesInInterval.set(numTuplesInInterval.get() + numTuples);
         }
 
         @Override
@@ -204,12 +217,29 @@
                     reportCongestionToSFM(currentTime - startTime);
                 }
             }
+            if (collectThroughput) {
+                int instantTput = (int) ((numTuplesInInterval.get() * 1000) / period);
+                System.out.println("Instantaneous throughput " + instantTput + " (" + feedRuntimeType + "[" + partition
+                        + "]" + ")");
+                reportThroughputToSFM(instantTput);
+            }
+            numTuplesInInterval.set(0);
         }
 
         private void reportCongestionToSFM(long waitingTime) {
+            sendReportToSFM(waitingTime, FeedReportMessageType.CONGESTION);
+        }
+
+        private void reportThroughputToSFM(long throughput) {
+            sendReportToSFM(throughput, FeedReportMessageType.THROUGHPUT);
+        }
+
+        private void sendReportToSFM(long value, SuperFeedManager.FeedReportMessageType mesgType) {
             String feedRep = feedId.getDataverse() + ":" + feedId.getFeedName() + ":" + feedId.getDatasetName();
             String operator = "" + feedRuntimeType;
-            String mesg = feedRep + "|" + operator + "|" + partition + "|" + waitingTime + "|" + EOL;
+            String mesg = mesgType.name().toLowerCase() + "|" + feedRep + "|" + operator + "|" + partition + "|"
+                    + value + "|" + EOL;
+
             Socket sc = null;
             try {
                 while (sfm == null) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
index 859990c..8f3a9b6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedIntakeOperatorNodePushable.java
@@ -25,6 +25,7 @@
 import edu.uci.ics.asterix.metadata.feeds.FeedRuntime.FeedRuntimeType;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 /**
@@ -42,6 +43,7 @@
     private final FeedPolicyEnforcer policyEnforcer;
     private FeedRuntime ingestionRuntime;
     private final String nodeId;
+    private FrameTupleAccessor fta;
 
     public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, FeedConnectionId feedId, IFeedAdapter adapter,
             Map<String, String> feedPolicy, int partition, IngestionRuntime ingestionRuntime) {
@@ -53,19 +55,20 @@
         this.feedPolicy = feedPolicy;
         policyEnforcer = new FeedPolicyEnforcer(feedId, feedPolicy);
         nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
+        fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+
     }
 
     @Override
     public void initialize() throws HyracksDataException {
 
         AdapterRuntimeManager adapterRuntimeMgr = null;
-        System.out.println("FEED INGESTION RUNTIME CALLED FOR " + partition);
         try {
             if (ingestionRuntime == null) {
                 ingestionRuntime = new IngestionRuntime(feedId, partition, FeedRuntimeType.INGESTION, adapterRuntimeMgr);
                 ExecutorService executorService = FeedManager.INSTANCE.registerFeedRuntime(ingestionRuntime);
                 FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId,
-                        FeedRuntimeType.INGESTION, partition, executorService);
+                        FeedRuntimeType.INGESTION, partition, executorService, fta);
                 adapterRuntimeMgr = new AdapterRuntimeManager(feedId, adapter, mWriter, partition, inbox);
 
                 if (adapter instanceof AbstractFeedDatasourceAdapter) {
@@ -75,7 +78,6 @@
                     LOGGER.info("Beginning new feed:" + feedId);
                 }
                 mWriter.open();
-                System.out.println("STARTING FEED INGESTION RUNTIME FOR " + partition);
                 adapterRuntimeMgr.start();
             } else {
                 adapterRuntimeMgr = ((IngestionRuntime) ingestionRuntime).getAdapterRuntimeManager();
@@ -85,7 +87,6 @@
                 adapter = adapterRuntimeMgr.getFeedAdapter();
                 writer.open();
                 adapterRuntimeMgr.getAdapterExecutor().setWriter(writer);
-                System.out.println("RESUMED FEED INGESTION RUNTIME FOR " + partition);
                 adapterRuntimeMgr.setState(State.ACTIVE_INGESTION);
             }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
index 261585d..cdc4d2a 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedMetaOperatorDescriptor.java
@@ -2,8 +2,6 @@
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -16,8 +14,10 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
@@ -69,6 +69,7 @@
         private boolean resumeOldState;
         private ExecutorService feedExecService;
         private String nodeId;
+        private FrameTupleAccessor fta;
 
         public FeedMetaNodePushable(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
                 int partition, int nPartitions, IOperatorDescriptor coreOperator, FeedConnectionId feedConnectionId,
@@ -80,6 +81,7 @@
             this.runtimeType = runtimeType;
             this.feedId = feedConnectionId;
             this.nodeId = ctx.getJobletContext().getApplicationContext().getNodeId();
+            fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
         }
 
         @Override
@@ -102,7 +104,7 @@
                 resumeOldState = true;
             }
             FeedFrameWriter mWriter = new FeedFrameWriter(writer, this, feedId, policyEnforcer, nodeId, runtimeType,
-                    partition, feedExecService);
+                    partition, feedExecService, fta);
             coreOperatorNodePushable.setOutputFrameWriter(0, mWriter, recordDesc);
             coreOperatorNodePushable.open();
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
index dc484a7..2f499e6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/SuperFeedManager.java
@@ -48,6 +48,11 @@
 
     private boolean isLocal = false;
 
+    public enum FeedReportMessageType {
+        CONGESTION,
+        THROUGHPUT
+    }
+
     public SuperFeedManager(FeedConnectionId feedId, String nodeId, int port) throws Exception {
         this.feedConnectionId = feedId;
         this.nodeId = nodeId;
@@ -157,9 +162,15 @@
                 while (true) {
                     try {
                         String message = messages.take();
-                        String[] msgComp = message.split("|");
+                        String[] messageComponents = message.split("|");
+                        SuperFeedManager.FeedReportMessageType mesgType = FeedReportMessageType
+                                .valueOf(messageComponents[0]);
+                        switch (mesgType) {
+                            case THROUGHPUT:
+                            case CONGESTION:
+                        }
                         if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Congestion Reported" + message);
+                            LOGGER.warning(message);
                         }
                     } catch (InterruptedException ie) {
                         throw ie;