Fixed Feed Connect Statement

This change includes two fixes:
1. Feed connect doesn't return until the connection is complete.
2. When using wait for completion, it waits until all the jobs
complete.

Change-Id: I416bf4917b1f5cea687d1202c435f7183136cf1f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/726
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
index d729680..04f20fb 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
@@ -32,15 +32,15 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.asterix.app.external.FeedLifecycleListener.Message;
+import org.apache.asterix.app.external.FeedLifecycleListener.FeedEvent;
 import org.apache.asterix.app.external.FeedWorkCollection.SubscribeFeedWork;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.external.feed.api.FeedOperationCounter;
 import org.apache.asterix.external.feed.api.IFeedJoint;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.api.IFeedJoint.State;
+import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedId;
@@ -59,6 +59,7 @@
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -78,37 +79,43 @@
 
     private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
 
-    private final LinkedBlockingQueue<Message> inbox;
+    private final LinkedBlockingQueue<FeedEvent> inbox;
     private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers;
 
     private final Map<JobId, FeedJobInfo> jobInfos;
     private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
     private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
-    private final Map<FeedId, List<IFeedJoint>> feedPipeline;
+    private final Map<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
     private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers;
 
-    public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) {
+    public FeedJobNotificationHandler(LinkedBlockingQueue<FeedEvent> inbox) {
         this.inbox = inbox;
         this.jobInfos = new HashMap<JobId, FeedJobInfo>();
         this.intakeJobInfos = new HashMap<FeedId, FeedIntakeInfo>();
         this.connectJobInfos = new HashMap<FeedConnectionId, FeedConnectJobInfo>();
-        this.feedPipeline = new HashMap<FeedId, List<IFeedJoint>>();
+        this.feedPipeline = new HashMap<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>>();
         this.eventSubscribers = new HashMap<FeedConnectionId, List<IFeedLifecycleEventSubscriber>>();
         this.feedIntakeProgressTrackers = new HashMap<FeedConnectionId, Pair<IIntakeProgressTracker, Long>>();
     }
 
     @Override
     public void run() {
-        Message mesg;
+        FeedEvent event;
         while (true) {
             try {
-                mesg = inbox.take();
-                switch (mesg.messageKind) {
+                event = inbox.take();
+                switch (event.eventKind) {
                     case JOB_START:
-                        handleJobStartMessage(mesg);
+                        handleJobStartEvent(event);
                         break;
                     case JOB_FINISH:
-                        handleJobFinishMessage(mesg);
+                        handleJobFinishEvent(event);
+                        break;
+                    case PROVIDER_READY:
+                        handleProviderReady(event);
+                        break;
+                    default:
+                        LOGGER.log(Level.WARNING, "Unknown Feed Event");
                         break;
                 }
             } catch (Exception e) {
@@ -121,11 +128,11 @@
     public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId,
             IIntakeProgressTracker feedIntakeProgressTracker) {
         if (feedIntakeProgressTrackers.get(connectionId) == null) {
-            this.feedIntakeProgressTrackers.put(connectionId, new Pair<IIntakeProgressTracker, Long>(
-                    feedIntakeProgressTracker, 0L));
+            this.feedIntakeProgressTrackers.put(connectionId,
+                    new Pair<IIntakeProgressTracker, Long>(feedIntakeProgressTracker, 0L));
         } else {
-            throw new IllegalStateException(" Progress tracker for connection " + connectionId
-                    + " is alreader registered");
+            throw new IllegalStateException(
+                    " Progress tracker for connection " + connectionId + " is alreader registered");
         }
     }
 
@@ -149,29 +156,35 @@
         return connectJobInfos.values();
     }
 
-    public void registerFeedJoint(IFeedJoint feedJoint) {
-        List<IFeedJoint> feedJointsOnPipeline = feedPipeline.get(feedJoint.getOwnerFeedId());
+    public synchronized void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) {
+        Pair<FeedOperationCounter, List<IFeedJoint>> feedJointsOnPipeline = feedPipeline
+                .get(feedJoint.getOwnerFeedId());
+
         if (feedJointsOnPipeline == null) {
-            feedJointsOnPipeline = new ArrayList<IFeedJoint>();
+            feedJointsOnPipeline = new Pair<FeedOperationCounter, List<IFeedJoint>>(
+                    new FeedOperationCounter(numOfPrividers, 1), new ArrayList<IFeedJoint>());
             feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
-            feedJointsOnPipeline.add(feedJoint);
+            feedJointsOnPipeline.second.add(feedJoint);
         } else {
-            if (!feedJointsOnPipeline.contains(feedJoint)) {
-                feedJointsOnPipeline.add(feedJoint);
+            if (!feedJointsOnPipeline.second.contains(feedJoint)) {
+                feedJointsOnPipeline.first.setJobsCount(feedJointsOnPipeline.first.getJobsCount() + 1);
+                feedJointsOnPipeline.second.add(feedJoint);
             } else {
                 throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
             }
         }
     }
 
-    public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec) throws HyracksDataException {
+    public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec)
+            throws HyracksDataException {
         if (jobInfos.get(jobId) != null) {
             throw new IllegalStateException("Feed job already registered");
         }
 
-        List<IFeedJoint> joints = feedPipeline.get(feedId);
+        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.containsKey(feedId) ? feedPipeline.get(feedId)
+                : null;
         IFeedJoint intakeJoint = null;
-        for (IFeedJoint joint : joints) {
+        for (IFeedJoint joint : pair.second) {
             if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
                 intakeJoint = joint;
                 break;
@@ -181,6 +194,7 @@
         if (intakeJoint != null) {
             FeedIntakeInfo intakeJobInfo = new FeedIntakeInfo(jobId, FeedJobState.CREATED, FeedJobInfo.JobType.INTAKE,
                     feedId, intakeJoint, jobSpec);
+            pair.first.setFeedJobInfo(intakeJobInfo);
             intakeJobInfos.put(feedId, intakeJobInfo);
             jobInfos.put(jobId, intakeJobInfo);
 
@@ -188,8 +202,8 @@
                 LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId);
             }
         } else {
-            throw new HyracksDataException("Could not register feed intake job [" + jobId + "]" + " for feed  "
-                    + feedId);
+            throw new HyracksDataException(
+                    "Could not register feed intake job [" + jobId + "]" + " for feed  " + feedId);
         }
     }
 
@@ -199,7 +213,7 @@
             throw new IllegalStateException("Feed job already registered");
         }
 
-        List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId);
+        List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId).second;
         FeedConnectionId cid = null;
         IFeedJoint sourceFeedJoint = null;
         for (IFeedJoint joint : feedJoints) {
@@ -238,7 +252,7 @@
         intakeJobInfos.remove(info.getFeedId());
 
         if (!info.getState().equals(FeedJobState.UNDER_RECOVERY)) {
-            List<IFeedJoint> joints = feedPipeline.get(info.getFeedId());
+            List<IFeedJoint> joints = feedPipeline.get(info.getFeedId()).second;
             joints.remove(info.getIntakeFeedJoint());
 
             if (LOGGER.isLoggable(Level.INFO)) {
@@ -252,7 +266,7 @@
 
     }
 
-    private void handleJobStartMessage(Message message) throws Exception {
+    private void handleJobStartEvent(FeedEvent message) throws Exception {
         FeedJobInfo jobInfo = jobInfos.get(message.jobId);
         switch (jobInfo.getJobType()) {
             case INTAKE:
@@ -265,7 +279,7 @@
 
     }
 
-    private void handleJobFinishMessage(Message message) throws Exception {
+    private void handleJobFinishEvent(FeedEvent message) throws Exception {
         FeedJobInfo jobInfo = jobInfos.get(message.jobId);
         switch (jobInfo.getJobType()) {
             case INTAKE:
@@ -276,12 +290,22 @@
                 break;
             case FEED_CONNECT:
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Collect Job finished for  " + (FeedConnectJobInfo) jobInfo);
+                    LOGGER.info("Collect Job finished for  " + jobInfo);
                 }
                 handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo);
                 break;
         }
+    }
 
+    private void handleProviderReady(FeedEvent message) {
+        FeedIntakeInfo jobInfo = (FeedIntakeInfo) jobInfos.get(message.jobId);
+        Pair<FeedOperationCounter, List<IFeedJoint>> feedCounter = feedPipeline.get(message.feedId);
+        feedCounter.first.setProvidersCount(feedCounter.first.getProvidersCount() - 1);;
+        if (feedCounter.first.getProvidersCount() == 0) {
+            jobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
+            jobInfo.setState(FeedJobState.ACTIVE);
+            notifyFeedEventSubscribers(jobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
+        }
     }
 
     private synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception {
@@ -306,19 +330,16 @@
         }
         // intakeLocations is an ordered list; element at position i corresponds to location of i'th instance of operator
         intakeJobInfo.setIntakeLocation(intakeLocations);
-        intakeJobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
-        intakeJobInfo.setState(FeedJobState.ACTIVE);
-
-        // notify event listeners
-        notifyFeedEventSubscribers(intakeJobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
     }
 
     private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws RemoteException, ACIDException {
         // set locations of feed sub-operations (intake, compute, store)
         setLocations(cInfo);
 
+        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(cInfo.getConnectionId().getFeedId());
+        pair.first.setJobsCount(pair.first.getJobsCount() + 1);
         // activate joints
-        List<IFeedJoint> joints = feedPipeline.get(cInfo.getConnectionId().getFeedId());
+        List<IFeedJoint> joints = pair.second;
         for (IFeedJoint joint : joints) {
             if (joint.getProvider().equals(cInfo.getConnectionId())) {
                 joint.setState(State.ACTIVE);
@@ -328,7 +349,6 @@
             }
         }
         cInfo.setState(FeedJobState.ACTIVE);
-
         // register activity in metadata
         registerFeedActivity(cInfo);
         // notify event listeners
@@ -413,20 +433,25 @@
         return connectJobInfos.get(connectionId).getState();
     }
 
-    private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, Message message) throws Exception {
+    private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, FeedEvent message) throws Exception {
         IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
         JobInfo info = hcc.getJobInfo(message.jobId);
         JobStatus status = info.getStatus();
-        FeedLifecycleEvent event;
-        event = status.equals(JobStatus.FAILURE) ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
-                : FeedLifecycleEvent.FEED_ENDED;
-
+        FeedId feedId = intakeInfo.getFeedId();
+        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
+        pair.first.setJobsCount(pair.first.getJobsCount() - 1);
+        if (status.equals(JobStatus.FAILURE)) {
+            pair.first.setFailedIngestion(true);
+        }
         // remove feed joints
         deregisterFeedIntakeJob(message.jobId);
 
         // notify event listeners
-        notifyFeedEventSubscribers(intakeInfo, event);
-
+        if (pair.first.getJobsCount() == 0) {
+            feedPipeline.remove(feedId);
+            notifyFeedEventSubscribers(intakeInfo, pair.first.isFailedIngestion()
+                    ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED);
+        }
     }
 
     private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
@@ -446,7 +471,8 @@
             IFeedJoint feedJoint = cInfo.getSourceFeedJoint();
             feedJoint.removeReceiver(connectionId);
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
+                LOGGER.info(
+                        "Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
             }
             removeFeedJointsPostPipelineTermination(cInfo.getConnectionId());
         }
@@ -458,6 +484,15 @@
         }
         deregisterFeedActivity(cInfo);
 
+        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline
+                .get(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
+        pair.first.setJobsCount(pair.first.getJobsCount() - 1);
+        if (pair.first.getJobsCount() == 0) {
+            notifyFeedEventSubscribers(pair.first.getFeedJobInfo(), pair.first.isFailedIngestion()
+                    ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED);
+            feedPipeline.remove(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
+        }
+
         // notify event listeners
         FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED;
         notifyFeedEventSubscribers(cInfo, event);
@@ -486,11 +521,11 @@
 
         feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_CONNECT_TIMESTAMP, (new Date()).toString());
         try {
-            FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(), cInfo
-                    .getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
+            FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(),
+                    cInfo.getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(),
                     feedActivityDetails);
-            CentralFeedManager.getInstance().getFeedLoadManager()
-                    .reportFeedActivity(cInfo.getConnectionId(), feedActivity);
+            CentralFeedManager.getInstance().getFeedLoadManager().reportFeedActivity(cInfo.getConnectionId(),
+                    feedActivity);
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -514,7 +549,7 @@
 
     public void removeFeedJointsPostPipelineTermination(FeedConnectionId connectionId) {
         FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-        List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId());
+        List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId()).second;
 
         IFeedJoint sourceJoint = cInfo.getSourceFeedJoint();
         List<FeedConnectionId> all = sourceJoint.getReceivers();
@@ -534,7 +569,7 @@
     }
 
     public List<String> getFeedComputeLocations(FeedId feedId) {
-        List<IFeedJoint> feedJoints = feedPipeline.get(feedId);
+        List<IFeedJoint> feedJoints = feedPipeline.get(feedId).second;
         for (IFeedJoint joint : feedJoints) {
             if (joint.getFeedJointKey().getFeedId().equals(feedId)) {
                 return connectJobInfos.get(joint.getProvider()).getComputeLocations();
@@ -578,7 +613,8 @@
     //============================
 
     public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
-        List<IFeedJoint> joints = feedPipeline.get(feedJointKey.getFeedId());
+        List<IFeedJoint> joints = feedPipeline.containsKey(feedJointKey.getFeedId())
+                ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
         if (joints != null && !joints.isEmpty()) {
             for (IFeedJoint joint : joints) {
                 if (joint.getFeedJointKey().equals(feedJointKey)) {
@@ -598,7 +634,8 @@
     }
 
     public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) {
-        List<IFeedJoint> joints = feedPipeline.get(feedPointKey.getFeedId());
+        List<IFeedJoint> joints = feedPipeline.containsKey(feedPointKey.getFeedId())
+                ? feedPipeline.get(feedPointKey.getFeedId()).second : null;
         if (joints != null && !joints.isEmpty()) {
             for (IFeedJoint joint : joints) {
                 if (joint.getFeedJointKey().equals(feedPointKey)) {
@@ -615,7 +652,8 @@
             return feedJoint;
         } else {
             String jointKeyString = feedJointKey.getStringRep();
-            List<IFeedJoint> jointsOnPipeline = feedPipeline.get(feedJointKey.getFeedId());
+            List<IFeedJoint> jointsOnPipeline = feedPipeline.containsKey(feedJointKey.getFeedId())
+                    ? feedPipeline.get(feedJointKey.getFeedId()).second : null;
             IFeedJoint candidateJoint = null;
             if (jointsOnPipeline != null) {
                 for (IFeedJoint joint : jointsOnPipeline) {
@@ -638,7 +676,7 @@
     }
 
     public IFeedJoint getFeedPoint(FeedId sourceFeedId, IFeedJoint.FeedJointType type) {
-        List<IFeedJoint> joints = feedPipeline.get(sourceFeedId);
+        List<IFeedJoint> joints = feedPipeline.get(sourceFeedId).second;
         for (IFeedJoint joint : joints) {
             if (joint.getType().equals(type)) {
                 return joint;
diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
index 8e44af4..d7129b8 100644
--- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
@@ -88,7 +88,7 @@
     public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener();
     private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider();
 
-    private final LinkedBlockingQueue<Message> jobEventInbox;
+    private final LinkedBlockingQueue<FeedEvent> jobEventInbox;
     private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox;
     private final Map<FeedCollectInfo, List<String>> dependentFeeds = new HashMap<FeedCollectInfo, List<String>>();
     private final Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue;
@@ -99,7 +99,7 @@
     private ClusterState state;
 
     private FeedLifecycleListener() {
-        this.jobEventInbox = new LinkedBlockingQueue<Message>();
+        this.jobEventInbox = new LinkedBlockingQueue<FeedEvent>();
         this.feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox);
         this.responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>();
         this.feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox);
@@ -114,14 +114,14 @@
     @Override
     public void notifyJobStart(JobId jobId) throws HyracksException {
         if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
-            jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_START));
+            jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_START));
         }
     }
 
     @Override
     public void notifyJobFinish(JobId jobId) throws HyracksException {
         if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
-            jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_FINISH));
+            jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_FINISH));
         } else {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
@@ -178,19 +178,26 @@
         return feedJobNotificationHandler.getFeedJobState(connectionId);
     }
 
-    public static class Message {
+    public static class FeedEvent {
         public JobId jobId;
+        public FeedId feedId;
 
-        public enum MessageKind {
+        public enum EventKind {
             JOB_START,
-            JOB_FINISH
+            JOB_FINISH,
+            PROVIDER_READY
         }
 
-        public MessageKind messageKind;
+        public EventKind eventKind;
 
-        public Message(JobId jobId, MessageKind msgKind) {
+        public FeedEvent(JobId jobId, EventKind eventKind) {
+            this(jobId, eventKind, null);
+        }
+
+        public FeedEvent(JobId jobId, EventKind eventKind, FeedId feedId) {
             this.jobId = jobId;
-            this.messageKind = msgKind;
+            this.eventKind = eventKind;
+            this.feedId = feedId;
         }
     }
 
@@ -469,8 +476,8 @@
         return feedJobNotificationHandler.isFeedPointAvailable(feedJointKey);
     }
 
-    public void registerFeedJoint(IFeedJoint feedJoint) {
-        feedJobNotificationHandler.registerFeedJoint(feedJoint);
+    public void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) {
+        feedJobNotificationHandler.registerFeedJoint(feedJoint, numOfPrividers);
     }
 
     public IFeedJoint getFeedJoint(FeedJointKey feedJointKey) {
@@ -496,4 +503,8 @@
         return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
     }
 
+    public void notifyProviderReady(FeedId feedId, JobId jobId) {
+        jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.PROVIDER_READY, feedId));
+    }
+
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 213b090..67bf3bb 100644
--- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -26,7 +26,6 @@
 import java.io.InputStreamReader;
 import java.rmi.RemoteException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -2205,8 +2204,9 @@
                         metadataProvider, policyAccessor);
                 //adapter configuration are valid at this stage
                 //register the feed joints (these are auto-de-registered)
+                int numOfPrividers = pair.second.getPartitionConstraint().getLocations().length;
                 for (IFeedJoint fj : triple.third) {
-                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
+                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, numOfPrividers);
                 }
                 JobUtils.runJob(hcc, pair.first, false);
                 /*
@@ -2220,7 +2220,7 @@
                 eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
             } else {
                 for (IFeedJoint fj : triple.third) {
-                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj);
+                    FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, 0);
                 }
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 258bc35..2378032 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -25,9 +25,10 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.app.external.FeedLifecycleListener;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
 import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
@@ -36,6 +37,7 @@
 import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -79,12 +81,20 @@
             case COMPLETE_FAILBACK_RESPONSE:
                 handleCompleteFailbcakResponse(message);
                 break;
+            case FEED_PROVIDER_READY:
+                handleFeedProviderReady(message);
+                break;
             default:
                 LOGGER.warning("Unknown message: " + absMessage.getMessageType());
                 break;
         }
     }
 
+    private void handleFeedProviderReady(IMessage message) {
+        FeedProviderReadyMessage msg = (FeedProviderReadyMessage) message;
+        FeedLifecycleListener.INSTANCE.notifyProviderReady(msg.getFeedId(), msg.getJobId());
+    }
+
     private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {
         ResourceIdRequestMessage msg = (ResourceIdRequestMessage) message;
         ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql
similarity index 92%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql
index 864ce01..b696974 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql
@@ -23,7 +23,7 @@
  */
 use dataverse KeyVerse;
 
-for $d in dataset KVStore
-order by meta().id
-limit 5
-return meta().id;
+count(
+    for $d in dataset KVStore
+    return $d
+);
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql
deleted file mode 100644
index db6954e..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Create a change feed and test ingestion of records
- * Expected Res : Success
- * Date         : 24th Feb 2016
- */
-4000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql
index b696974..28e9a15 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql
@@ -24,6 +24,7 @@
 use dataverse KeyVerse;
 
 count(
-    for $d in dataset KVStore
-    return $d
+for $d in dataset KVStore
+distinct by meta()."key"
+return 1
 );
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql
index 28e9a15..7423399 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql
@@ -23,8 +23,12 @@
  */
 use dataverse KeyVerse;
 
-count(
+
 for $d in dataset KVStore
-distinct by meta()."key"
-return 1
-);
+group by $vb := meta().vbucket with $d
+order by $vb
+limit 5
+return {
+   "vbucket": $vb,
+   "tuple_count": count($d)
+};
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.ddl.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.ddl.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql
deleted file mode 100644
index 7423399..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Create a change feed and test ingestion of records
- * Expected Res : Success
- * Date         : 24th Feb 2016
- */
-use dataverse KeyVerse;
-
-
-for $d in dataset KVStore
-group by $vb := meta().vbucket with $d
-order by $vb
-limit 5
-return {
-   "vbucket": $vb,
-   "tuple_count": count($d)
-};
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql
deleted file mode 100644
index d282e66..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Create a change feed and test ingestion of records
- * Expected Res : Success
- * Date         : 24th Feb 2016
- */
-drop dataverse KeyVerse;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
index 4d2f9c4..4c1635f 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql
@@ -26,3 +26,4 @@
 set wait-for-completion-feed "false";
 
 connect feed TweetFeed to dataset Tweets;
+disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.ddl.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.ddl.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
deleted file mode 100644
index e70df33..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Drop a dataverse with disconnected feed
- * Expected Res : Success
- * Date         : 24th Feb 2016
- */
-3000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
deleted file mode 100644
index 34d6285..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Drop a dataverse with disconnected feed
- * Expected Res : Success
- * Date         : 24th Feb 2016
- */
-
-use dataverse experiments;
-disconnect feed TweetFeed from dataset Tweets;
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.server.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.server.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql
deleted file mode 100644
index eb18795..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Create a socket feed with a client that pushes
- * 10 records. The feed is connected to a dataset that is then
- * queried for the data.
- * Expected Res : Success
- * Date         : 24th Feb 2016
- */
-3000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.sleep.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.sleep.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.update.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.update.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.query.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.query.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.server.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.server.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.ddl.aql
similarity index 100%
rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql
rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.ddl.aql
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql
similarity index 92%
copy from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql
copy to asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql
index 864ce01..b696974 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql
@@ -23,7 +23,7 @@
  */
 use dataverse KeyVerse;
 
-for $d in dataset KVStore
-order by meta().id
-limit 5
-return meta().id;
+count(
+    for $d in dataset KVStore
+    return $d
+);
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql
deleted file mode 100644
index db6954e..0000000
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/*
- * Description  : Create a change feed and test ingestion of records
- * Expected Res : Success
- * Date         : 24th Feb 2016
- */
-4000
\ No newline at end of file
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql
index b696974..864ce01 100644
--- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql
+++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql
@@ -23,7 +23,7 @@
  */
 use dataverse KeyVerse;
 
-count(
-    for $d in dataset KVStore
-    return $d
-);
+for $d in dataset KVStore
+order by meta().id
+limit 5
+return meta().id;
diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.ddl.aql
similarity index 100%
copy from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql
copy to asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.ddl.aql
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 5d2e263..fba74e8 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -35,7 +35,8 @@
         PREPARE_PARTITIONS_FAILBACK_RESPONSE,
         COMPLETE_FAILBACK_REQUEST,
         COMPLETE_FAILBACK_RESPONSE,
-        REPLICA_EVENT
+        REPLICA_EVENT,
+        FEED_PROVIDER_READY
     }
 
     public abstract ApplicationMessageType getMessageType();
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
new file mode 100644
index 0000000..7b74ef9
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.external.feed.watch.FeedJobInfo;
+
+public class FeedOperationCounter {
+    private FeedJobInfo feedJobInfo;
+    private int providersCount;
+    private int jobsCount;
+    private boolean failedIngestion = false;
+
+    public FeedOperationCounter(int providersCount, int jobsCount) {
+        this.providersCount = providersCount;
+        this.jobsCount = jobsCount;
+    }
+
+    public int getProvidersCount() {
+        return providersCount;
+    }
+
+    public void setProvidersCount(int providersCount) {
+        this.providersCount = providersCount;
+    }
+
+    public int getJobsCount() {
+        return jobsCount;
+    }
+
+    public void setJobsCount(int jobsCount) {
+        this.jobsCount = jobsCount;
+    }
+
+    public boolean isFailedIngestion() {
+        return failedIngestion;
+    }
+
+    public void setFailedIngestion(boolean failedIngestion) {
+        this.failedIngestion = failedIngestion;
+    }
+
+    public FeedJobInfo getFeedJobInfo() {
+        return feedJobInfo;
+    }
+
+    public void setFeedJobInfo(FeedJobInfo feedJobInfo) {
+        this.feedJobInfo = feedJobInfo;
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
new file mode 100644
index 0000000..4c81c5b
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.hyracks.api.job.JobId;
+
+public class FeedProviderReadyMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final FeedId feedId;
+    private final JobId jobId;
+
+    public FeedProviderReadyMessage(FeedId feedId, JobId jobId) {
+        this.feedId = feedId;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.FEED_PROVIDER_READY;
+    }
+
+    public FeedId getFeedId() {
+        return feedId;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index d0348c2..8475e45 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -147,22 +147,7 @@
     }
 
     private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) {
-        int waitCycleCount = 0;
-        ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-        while (ingestionRuntime == null && waitCycleCount < 1000) {
-            try {
-                Thread.sleep(3000);
-                waitCycleCount++;
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId);
-                }
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-                break;
-            }
-            ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
-        }
-        return (IngestionRuntime) ingestionRuntime;
+        return (IngestionRuntime) subscriptionManager.getSubscribableRuntime(subscribableRuntimeId);
     }
 
     public ConnectionLocation getSubscriptionLocation() {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index c1748d9..6771010 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
@@ -115,6 +116,9 @@
                 ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc,
                         adapterRuntimeManager, ctx);
                 feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
+                // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
+                ctx.sendApplicationMessageToCC(new FeedProviderReadyMessage(feedId, ctx.getJobletContext().getJobId()),
+                        null);
                 feedFrameWriter.open();
             } else {
                 if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) {