Asterix-1389 Fix Deadlocks in Feed Connections

This change ensures the completion of the Feed connect and feed
disconnect statement each as an atomic operation.
Previously, we assumed that with the intake ready on all nodes
and the connect started, the connect is complete. That is not
true. In order for the connect to be complete, we need to ensure
that the connect subscribe to the intake in all intake nodes.
Likewise, the disconnect shouldn't return until the connect
job terminates.

Change-Id: Ib2778b4d7f156c7e06ac9f561a26783c4933a22c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/792
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
index 04f20fb..a143578 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
@@ -81,7 +81,6 @@
 
     private final LinkedBlockingQueue<FeedEvent> inbox;
     private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers;
-
     private final Map<JobId, FeedJobInfo> jobInfos;
     private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
     private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
@@ -100,7 +99,8 @@
 
     @Override
     public void run() {
-        FeedEvent event;
+        FeedEvent event = null;
+        Thread.currentThread().setName("FeedJobNotificationHandler");
         while (true) {
             try {
                 event = inbox.take();
@@ -111,8 +111,8 @@
                     case JOB_FINISH:
                         handleJobFinishEvent(event);
                         break;
-                    case PROVIDER_READY:
-                        handleProviderReady(event);
+                    case PARTITION_START:
+                        handlePartitionStart(event);
                         break;
                     default:
                         LOGGER.log(Level.WARNING, "Unknown Feed Event");
@@ -121,7 +121,6 @@
             } catch (Exception e) {
                 e.printStackTrace();
             }
-
         }
     }
 
@@ -162,12 +161,11 @@
 
         if (feedJointsOnPipeline == null) {
             feedJointsOnPipeline = new Pair<FeedOperationCounter, List<IFeedJoint>>(
-                    new FeedOperationCounter(numOfPrividers, 1), new ArrayList<IFeedJoint>());
+                    new FeedOperationCounter(numOfPrividers), new ArrayList<IFeedJoint>());
             feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
             feedJointsOnPipeline.second.add(feedJoint);
         } else {
             if (!feedJointsOnPipeline.second.contains(feedJoint)) {
-                feedJointsOnPipeline.first.setJobsCount(feedJointsOnPipeline.first.getJobsCount() + 1);
                 feedJointsOnPipeline.second.add(feedJoint);
             } else {
                 throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered");
@@ -175,7 +173,7 @@
         }
     }
 
-    public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec)
+    public synchronized void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec)
             throws HyracksDataException {
         if (jobInfos.get(jobId) != null) {
             throw new IllegalStateException("Feed job already registered");
@@ -207,7 +205,7 @@
         }
     }
 
-    public void registerFeedCollectionJob(FeedId sourceFeedId, FeedConnectionId connectionId, JobId jobId,
+    public synchronized void registerFeedCollectionJob(FeedId sourceFeedId, FeedConnectionId connectionId, JobId jobId,
             JobSpecification jobSpec, Map<String, String> feedPolicy) {
         if (jobInfos.get(jobId) != null) {
             throw new IllegalStateException("Feed job already registered");
@@ -242,7 +240,7 @@
 
     }
 
-    public void deregisterFeedIntakeJob(JobId jobId) {
+    public synchronized void deregisterFeedIntakeJob(JobId jobId) {
         if (jobInfos.get(jobId) == null) {
             throw new IllegalStateException(" Feed Intake job not registered ");
         }
@@ -266,7 +264,7 @@
 
     }
 
-    private void handleJobStartEvent(FeedEvent message) throws Exception {
+    private synchronized void handleJobStartEvent(FeedEvent message) throws Exception {
         FeedJobInfo jobInfo = jobInfos.get(message.jobId);
         switch (jobInfo.getJobType()) {
             case INTAKE:
@@ -279,7 +277,7 @@
 
     }
 
-    private void handleJobFinishEvent(FeedEvent message) throws Exception {
+    private synchronized void handleJobFinishEvent(FeedEvent message) throws Exception {
         FeedJobInfo jobInfo = jobInfos.get(message.jobId);
         switch (jobInfo.getJobType()) {
             case INTAKE:
@@ -297,14 +295,27 @@
         }
     }
 
-    private void handleProviderReady(FeedEvent message) {
-        FeedIntakeInfo jobInfo = (FeedIntakeInfo) jobInfos.get(message.jobId);
-        Pair<FeedOperationCounter, List<IFeedJoint>> feedCounter = feedPipeline.get(message.feedId);
-        feedCounter.first.setProvidersCount(feedCounter.first.getProvidersCount() - 1);;
-        if (feedCounter.first.getProvidersCount() == 0) {
-            jobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
-            jobInfo.setState(FeedJobState.ACTIVE);
-            notifyFeedEventSubscribers(jobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
+    private synchronized void handlePartitionStart(FeedEvent message) {
+        FeedJobInfo jobInfo = jobInfos.get(message.jobId);
+        switch (jobInfo.getJobType()) {
+            case FEED_CONNECT:
+                ((FeedConnectJobInfo) jobInfo).partitionStart();
+                if (((FeedConnectJobInfo) jobInfo).collectionStarted()) {
+                    notifyFeedEventSubscribers(jobInfo, FeedLifecycleEvent.FEED_COLLECT_STARTED);
+                }
+                break;
+            case INTAKE:
+                Pair<FeedOperationCounter, List<IFeedJoint>> feedCounter = feedPipeline.get(message.feedId);
+                feedCounter.first.setPartitionCount(feedCounter.first.getPartitionCount() - 1);;
+                if (feedCounter.first.getPartitionCount() == 0) {
+                    ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
+                    jobInfo.setState(FeedJobState.ACTIVE);
+                    notifyFeedEventSubscribers(jobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED);
+                }
+                break;
+            default:
+                break;
+
         }
     }
 
@@ -335,9 +346,7 @@
     private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws RemoteException, ACIDException {
         // set locations of feed sub-operations (intake, compute, store)
         setLocations(cInfo);
-
         Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(cInfo.getConnectionId().getFeedId());
-        pair.first.setJobsCount(pair.first.getJobsCount() + 1);
         // activate joints
         List<IFeedJoint> joints = pair.second;
         for (IFeedJoint joint : joints) {
@@ -351,8 +360,6 @@
         cInfo.setState(FeedJobState.ACTIVE);
         // register activity in metadata
         registerFeedActivity(cInfo);
-        // notify event listeners
-        notifyFeedEventSubscribers(cInfo, FeedLifecycleEvent.FEED_COLLECT_STARTED);
     }
 
     private void notifyFeedEventSubscribers(FeedJobInfo jobInfo, FeedLifecycleEvent event) {
@@ -376,6 +383,9 @@
                     subscriber.handleFeedEvent(event);
                 }
             }
+            if (event == FeedLifecycleEvent.FEED_COLLECT_ENDED) {
+                eventSubscribers.remove(connId);
+            }
         }
     }
 
@@ -416,12 +426,17 @@
         return activeConnections;
     }
 
-    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
+    public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
+            IFeedLifecycleEventSubscriber eventSubscriber) {
+        boolean active = false;
         FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
         if (cInfo != null) {
-            return cInfo.getState().equals(FeedJobState.ACTIVE);
+            active = cInfo.getState().equals(FeedJobState.ACTIVE);
         }
-        return false;
+        if (active) {
+            registerFeedEventSubscriber(connectionId, eventSubscriber);
+        }
+        return active;
     }
 
     public void setJobState(FeedConnectionId connectionId, FeedJobState jobState) {
@@ -433,28 +448,26 @@
         return connectJobInfos.get(connectionId).getState();
     }
 
-    private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, FeedEvent message) throws Exception {
+    private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, FeedEvent message)
+            throws Exception {
         IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
         JobInfo info = hcc.getJobInfo(message.jobId);
         JobStatus status = info.getStatus();
         FeedId feedId = intakeInfo.getFeedId();
         Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId);
-        pair.first.setJobsCount(pair.first.getJobsCount() - 1);
         if (status.equals(JobStatus.FAILURE)) {
             pair.first.setFailedIngestion(true);
         }
         // remove feed joints
         deregisterFeedIntakeJob(message.jobId);
-
         // notify event listeners
-        if (pair.first.getJobsCount() == 0) {
-            feedPipeline.remove(feedId);
-            notifyFeedEventSubscribers(intakeInfo, pair.first.isFailedIngestion()
-                    ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED);
-        }
+        feedPipeline.remove(feedId);
+        intakeJobInfos.remove(feedId);
+        notifyFeedEventSubscribers(intakeInfo, pair.first.isFailedIngestion() ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
+                : FeedLifecycleEvent.FEED_INTAKE_ENDED);
     }
 
-    private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
+    private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
         FeedConnectionId connectionId = cInfo.getConnectionId();
 
         IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
@@ -462,8 +475,6 @@
         JobStatus status = info.getStatus();
         boolean failure = status != null && status.equals(JobStatus.FAILURE);
         FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
-
-        boolean removeJobHistory = !failure;
         boolean retainSubsription = cInfo.getState().equals(FeedJobState.UNDER_RECOVERY)
                 || (failure && fpa.continueOnHardwareFailure());
 
@@ -474,27 +485,15 @@
                 LOGGER.info(
                         "Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription");
             }
-            removeFeedJointsPostPipelineTermination(cInfo.getConnectionId());
         }
 
-        if (removeJobHistory) {
-            connectJobInfos.remove(connectionId);
-            jobInfos.remove(cInfo.getJobId());
-            feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
-        }
+        connectJobInfos.remove(connectionId);
+        jobInfos.remove(cInfo.getJobId());
+        feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
         deregisterFeedActivity(cInfo);
-
-        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline
-                .get(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
-        pair.first.setJobsCount(pair.first.getJobsCount() - 1);
-        if (pair.first.getJobsCount() == 0) {
-            notifyFeedEventSubscribers(pair.first.getFeedJobInfo(), pair.first.isFailedIngestion()
-                    ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED);
-            feedPipeline.remove(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
-        }
-
         // notify event listeners
-        FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED;
+        FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE
+                : FeedLifecycleEvent.FEED_COLLECT_ENDED;
         notifyFeedEventSubscribers(cInfo, event);
     }
 
@@ -547,23 +546,6 @@
         }
     }
 
-    public void removeFeedJointsPostPipelineTermination(FeedConnectionId connectionId) {
-        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-        List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId()).second;
-
-        IFeedJoint sourceJoint = cInfo.getSourceFeedJoint();
-        List<FeedConnectionId> all = sourceJoint.getReceivers();
-        boolean removeSourceJoint = all.size() < 2;
-        if (removeSourceJoint) {
-            feedJoints.remove(sourceJoint);
-        }
-
-        IFeedJoint computeJoint = cInfo.getComputeFeedJoint();
-        if (computeJoint != null && computeJoint.getReceivers().size() < 2) {
-            feedJoints.remove(computeJoint);
-        }
-    }
-
     public boolean isRegisteredFeedJob(JobId jobId) {
         return jobInfos.get(jobId) != null;
     }
@@ -594,7 +576,8 @@
         return connectJobInfos.get(connectionId).getJobId();
     }
 
-    public void registerFeedEventSubscriber(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber subscriber) {
+    public synchronized void registerFeedEventSubscriber(FeedConnectionId connectionId,
+            IFeedLifecycleEventSubscriber subscriber) {
         List<IFeedLifecycleEventSubscriber> subscribers = eventSubscribers.get(connectionId);
         if (subscribers == null) {
             subscribers = new ArrayList<IFeedLifecycleEventSubscriber>();
@@ -610,7 +593,7 @@
         }
     }
 
-    //============================
+    // ============================
 
     public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
         List<IFeedJoint> joints = feedPipeline.containsKey(feedJointKey.getFeedId())
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
index d7129b8..161c863 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
@@ -112,14 +112,14 @@
     }
 
     @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId) throws HyracksException {
         if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
             jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_START));
         }
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId) throws HyracksException {
         if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
             jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_FINISH));
         } else {
@@ -185,7 +185,7 @@
         public enum EventKind {
             JOB_START,
             JOB_FINISH,
-            PROVIDER_READY
+            PARTITION_START
         }
 
         public EventKind eventKind;
@@ -446,12 +446,12 @@
     }
 
     @Override
-    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.isFeedConnectionActive(connectionId);
+    public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
+            IFeedLifecycleEventSubscriber eventSubscriber) {
+        return feedJobNotificationHandler.isFeedConnectionActive(connectionId, eventSubscriber);
     }
 
     public void reportPartialDisconnection(FeedConnectionId connectionId) {
-        feedJobNotificationHandler.removeFeedJointsPostPipelineTermination(connectionId);
     }
 
     public void registerFeedReportQueue(FeedConnectionId feedId, LinkedBlockingQueue<String> queue) {
@@ -503,8 +503,8 @@
         return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
     }
 
-    public void notifyProviderReady(FeedId feedId, JobId jobId) {
-        jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.PROVIDER_READY, feedId));
+    public synchronized void notifyPartitionStart(FeedId feedId, JobId jobId) {
+        jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.PARTITION_START, feedId));
     }
 
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 3786413..132c8c9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -609,9 +609,9 @@
                     }
                     if (compactionPolicy == null) {
                         if (filterField != null) {
-                            //If the dataset has a filter and the user didn't specify a merge
-                            //policy, then we will pick the
-                            //correlated-prefix as the default merge policy.
+                            // If the dataset has a filter and the user didn't specify a merge
+                            // policy, then we will pick the
+                            // correlated-prefix as the default merge policy.
                             compactionPolicy = GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
                             compactionPolicyProperties = GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
                         }
@@ -632,12 +632,12 @@
 
             }
 
-            //#. initialize DatasetIdFactory if it is not initialized.
+            // #. initialize DatasetIdFactory if it is not initialized.
             if (!DatasetIdFactory.isInitialized()) {
                 DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
             }
 
-            //#. add a new dataset with PendingAddOp
+            // #. add a new dataset with PendingAddOp
             dataset = new Dataset(dataverseName, datasetName, itemTypeDataverseName, itemTypeName,
                     metaItemTypeDataverseName, metaItemTypeName, ngName, compactionPolicy, compactionPolicyProperties,
                     datasetDetails, dd.getHints(), dsType, DatasetIdFactory.generateDatasetId(),
@@ -650,21 +650,21 @@
                 JobSpecification jobSpec = DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
                         metadataProvider);
 
-                //#. make metadataTxn commit before calling runJob.
+                // #. make metadataTxn commit before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //#. runJob
+                // #. runJob
                 JobUtils.runJob(hcc, jobSpec, true);
 
-                //#. begin new metadataTxn
+                // #. begin new metadataTxn
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             }
 
-            //#. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
+            // #. add a new dataset with PendingNoOp after deleting the dataset with PendingAddOp
             MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
             dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), dataset);
@@ -676,11 +676,11 @@
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
 
-                //#. execute compensation operations
-                //remove the index in NC
-                //[Notice]
-                //As long as we updated(and committed) metadata, we should remove any effect of the job
-                //because an exception occurs during runJob.
+                // #. execute compensation operations
+                // remove the index in NC
+                // [Notice]
+                // As long as we updated(and committed) metadata, we should remove any effect of the job
+                // because an exception occurs during runJob.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -697,7 +697,7 @@
                     }
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -820,7 +820,7 @@
         String indexName = null;
         JobSpecification spec = null;
         Dataset ds = null;
-        //For external datasets
+        // For external datasets
         ArrayList<ExternalFile> externalFilesSnapshot = null;
         boolean firstExternalDatasetIndex = false;
         boolean filesIndexReplicated = false;
@@ -904,10 +904,10 @@
                 }
             }
 
-            //Checks whether a user is trying to create an inverted secondary index on a dataset
-            //with a variable-length primary key.
-            //Currently, we do not support this. Therefore, as a temporary solution, we print an
-            //error message and stop.
+            // Checks whether a user is trying to create an inverted secondary index on a dataset
+            // with a variable-length primary key.
+            // Currently, we do not support this. Therefore, as a temporary solution, we print an
+            // error message and stop.
             if (stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX
                     || stmtCreateIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -917,7 +917,7 @@
                     IAType keyType = aRecordType.getSubFieldType(partitioningKey);
                     ITypeTraits typeTrait = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
 
-                    //If it is not a fixed length
+                    // If it is not a fixed length
                     if (typeTrait.getFixedLength() < 0) {
                         throw new AlgebricksException("The keyword or ngram index -" + indexName
                                 + " cannot be created on the dataset -" + datasetName
@@ -930,27 +930,27 @@
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 validateIfResourceIsActiveInFeed(dataverseName, datasetName);
             } else {
-                //External dataset
-                //Check if the dataset is indexible
+                // External dataset
+                // Check if the dataset is indexible
                 if (!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) ds.getDatasetDetails())) {
                     throw new AlgebricksException(
                             "dataset using " + ((ExternalDatasetDetails) ds.getDatasetDetails()).getAdapter()
                                     + " Adapter can't be indexed");
                 }
-                //Check if the name of the index is valid
+                // Check if the name of the index is valid
                 if (!ExternalIndexingOperations.isValidIndexName(datasetName, indexName)) {
                     throw new AlgebricksException("external dataset index name is invalid");
                 }
 
-                //Check if the files index exist
+                // Check if the files index exist
                 filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                         datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                 firstExternalDatasetIndex = (filesIndex == null);
-                //Lock external dataset
+                // Lock external dataset
                 ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, firstExternalDatasetIndex);
                 datasetLocked = true;
                 if (firstExternalDatasetIndex) {
-                    //Verify that no one has created an index before we acquire the lock
+                    // Verify that no one has created an index before we acquire the lock
                     filesIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
                             dataverseName, datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                     if (filesIndex != null) {
@@ -960,20 +960,20 @@
                     }
                 }
                 if (firstExternalDatasetIndex) {
-                    //Get snapshot from External File System
+                    // Get snapshot from External File System
                     externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
-                    //Add an entry for the files index
+                    // Add an entry for the files index
                     filesIndex = new Index(dataverseName, datasetName,
                             ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
                             ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
                             ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
                             IMetadataEntity.PENDING_ADD_OP);
                     MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                    //Add files to the external files index
+                    // Add files to the external files index
                     for (ExternalFile file : externalFilesSnapshot) {
                         MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
                     }
-                    //This is the first index for the external dataset, replicate the files index
+                    // This is the first index for the external dataset, replicate the files index
                     spec = ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, externalFilesSnapshot,
                             metadataProvider, true);
                     if (spec == null) {
@@ -985,7 +985,7 @@
                 }
             }
 
-            //check whether there exists another enforced index on the same field
+            // check whether there exists another enforced index on the same field
             if (stmtCreateIndex.isEnforced()) {
                 List<Index> indexes = MetadataManager.INSTANCE
                         .getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName);
@@ -999,7 +999,7 @@
                 }
             }
 
-            //#. add a new index with PendingAddOp
+            // #. add a new index with PendingAddOp
             Index index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
                     keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(),
                     false, IMetadataEntity.PENDING_ADD_OP);
@@ -1011,7 +1011,7 @@
                         Lists.newArrayList(index));
             }
 
-            //#. prepare to create the index artifact in NC.
+            // #. prepare to create the index artifact in NC.
             CompiledCreateIndexStatement cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                     index.getDatasetName(), index.getKeyFieldNames(), index.getKeyFieldTypes(),
                     index.isEnforcingKeyFileds(), index.getGramLength(), index.getIndexType());
@@ -1025,14 +1025,14 @@
 
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-            //#. create the index artifact in NC.
+            // #. create the index artifact in NC.
             JobUtils.runJob(hcc, spec, true);
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. load data into the index in NC.
+            // #. load data into the index in NC.
             cis = new CompiledCreateIndexStatement(index.getIndexName(), dataverseName, index.getDatasetName(),
                     index.getKeyFieldNames(), index.getKeyFieldTypes(), index.isEnforcingKeyFileds(),
                     index.getGramLength(), index.getIndexType());
@@ -1042,24 +1042,24 @@
 
             JobUtils.runJob(hcc, spec, true);
 
-            //#. begin new metadataTxn
+            // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. add another new index with PendingNoOp after deleting the index with PendingAddOp
+            // #. add another new index with PendingNoOp after deleting the index with PendingAddOp
             MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                     indexName);
             index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
-            //add another new files index with PendingNoOp after deleting the index with
-            //PendingAddOp
+            // add another new files index with PendingNoOp after deleting the index with
+            // PendingAddOp
             if (firstExternalDatasetIndex) {
                 MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
                         filesIndex.getIndexName());
                 filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
-                //update transaction timestamp
+                // update transaction timestamp
                 ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(new Date());
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
             }
@@ -1069,7 +1069,7 @@
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
-            //If files index was replicated for external dataset, it should be cleaned up on NC side
+            // If files index was replicated for external dataset, it should be cleaned up on NC side
             if (filesIndexReplicated) {
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
@@ -1090,8 +1090,8 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //remove the index in NC
+                // #. execute compensation operations
+                // remove the index in NC
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1114,7 +1114,7 @@
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
-                        //Drop External Files from metadata
+                        // Drop External Files from metadata
                         MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     } catch (Exception e2) {
@@ -1126,7 +1126,7 @@
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
-                        //Drop the files index from metadata
+                        // Drop the files index from metadata
                         MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                                 datasetName, ExternalIndexingOperations.getFilesIndexName(datasetName));
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1138,7 +1138,7 @@
                                 + ") couldn't be removed from the metadata", e);
                     }
                 }
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1221,7 +1221,7 @@
                 }
             }
 
-            //# disconnect all feeds from any datasets in the dataverse.
+            // # disconnect all feeds from any datasets in the dataverse.
             List<FeedConnectionId> activeFeedConnections = FeedLifecycleListener.INSTANCE
                     .getActiveFeedConnections(null);
             DisconnectFeedStatement disStmt = null;
@@ -1243,13 +1243,13 @@
                                     + connection.getDatasetName() + ". Encountered exception " + exception);
                         }
                     }
-                    //prepare job to remove feed log storage
+                    // prepare job to remove feed log storage
                     jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
                             MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, feedId.getFeedName())));
                 }
             }
 
-            //#. prepare jobs which will drop corresponding datasets with indexes.
+            // #. prepare jobs which will drop corresponding datasets with indexes.
             List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
             for (int j = 0; j < datasets.size(); j++) {
                 String datasetName = datasets.get(j).getDatasetName();
@@ -1269,7 +1269,7 @@
                     CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
                     jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
                 } else {
-                    //External dataset
+                    // External dataset
                     List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
                             datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
@@ -1289,10 +1289,10 @@
                 }
             }
             jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
-            //#. mark PendingDropOp on the dataverse record by
-            //first, deleting the dataverse record from the DATAVERSE_DATASET
-            //second, inserting the dataverse record with the PendingDropOp value into the
-            //DATAVERSE_DATASET
+            // #. mark PendingDropOp on the dataverse record by
+            // first, deleting the dataverse record from the DATAVERSE_DATASET
+            // second, inserting the dataverse record with the PendingDropOp value into the
+            // DATAVERSE_DATASET
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
                     new Dataverse(dataverseName, dv.getDataFormat(), IMetadataEntity.PENDING_DROP_OP));
@@ -1309,7 +1309,7 @@
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. finally, delete the dataverse.
+            // #. finally, delete the dataverse.
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             if (activeDefaultDataverse != null && activeDefaultDataverse.getDataverseName() == dataverseName) {
                 activeDefaultDataverse = null;
@@ -1325,18 +1325,18 @@
                     activeDefaultDataverse = null;
                 }
 
-                //#. execute compensation operations
-                //remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
+                    // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 try {
                     MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
@@ -1383,7 +1383,7 @@
 
             Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, Boolean>>();
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                //prepare job spec(s) that would disconnect any active feeds involving the dataset.
+                // prepare job spec(s) that would disconnect any active feeds involving the dataset.
                 List<FeedConnectionId> feedConnections = FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
                 if (feedConnections != null && !feedConnections.isEmpty()) {
                     for (FeedConnectionId connection : feedConnections) {
@@ -1394,14 +1394,14 @@
                             LOGGER.info("Disconnecting feed " + connection.getFeedId().getFeedName() + " from dataset "
                                     + datasetName + " as dataset is being dropped");
                         }
-                        //prepare job to remove feed log storage
+                        // prepare job to remove feed log storage
                         jobsToExecute
                                 .add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(mdTxnCtx,
                                         connection.getFeedId().getDataverse(), connection.getFeedId().getFeedName())));
                     }
                 }
 
-                //#. prepare jobs to drop the datatset and the indexes in NC
+                // #. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
@@ -1413,7 +1413,7 @@
                 CompiledDatasetDropStatement cds = new CompiledDatasetDropStatement(dataverseName, datasetName);
                 jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
 
-                //#. mark the existing dataset as PendingDropOp
+                // #. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
                 MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                         new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1426,12 +1426,12 @@
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //# disconnect the feeds
+                // # disconnect the feeds
                 for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
                     JobUtils.runJob(hcc, p.first, true);
                 }
 
-                //#. run the jobs
+                // #. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
@@ -1440,9 +1440,9 @@
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             } else {
-                //External dataset
+                // External dataset
                 ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-                //#. prepare jobs to drop the datatset and the indexes in NC
+                // #. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
                     if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
@@ -1457,7 +1457,7 @@
                     }
                 }
 
-                //#. mark the existing dataset as PendingDropOp
+                // #. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
                 MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                         new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1469,7 +1469,7 @@
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //#. run the jobs
+                // #. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
@@ -1481,9 +1481,9 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             }
 
-            //#. finally, delete the dataset.
+            // #. finally, delete the dataset.
             MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, datasetName);
-            //Drop the associated nodegroup
+            // Drop the associated nodegroup
             String nodegroup = ds.getNodeGroupName();
             if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
                 MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName + ":" + datasetName);
@@ -1496,18 +1496,18 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
+                    // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1542,7 +1542,7 @@
         MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, dataverseName + "." + datasetName);
 
         String indexName = null;
-        //For external index
+        // For external index
         boolean dropFilesIndex = false;
         List<JobSpecification> jobsToExecute = new ArrayList<JobSpecification>();
         try {
@@ -1581,18 +1581,18 @@
                         throw new AlgebricksException("There is no index with this name " + indexName + ".");
                     }
                 }
-                //#. prepare a job to drop the index in NC.
+                // #. prepare a job to drop the index in NC.
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
 
-                //#. mark PendingDropOp on the existing index
+                // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                         new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
                                 index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
-                //#. commit the existing transaction before calling runJob.
+                // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1601,15 +1601,15 @@
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
 
-                //#. begin a new transaction
+                // #. begin a new transaction
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-                //#. finally, delete the existing index
+                // #. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
             } else {
-                //External dataset
+                // External dataset
                 indexName = stmtIndexDrop.getIndexName().getValue();
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (index == null) {
@@ -1622,21 +1622,21 @@
                 } else if (ExternalIndexingOperations.isFileIndex(index)) {
                     throw new AlgebricksException("Dropping a dataset's files index is not allowed.");
                 }
-                //#. prepare a job to drop the index in NC.
+                // #. prepare a job to drop the index in NC.
                 CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, metadataProvider, ds));
                 List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
                         datasetName);
                 if (datasetIndexes.size() == 2) {
                     dropFilesIndex = true;
-                    //only one index + the files index, we need to delete both of the indexes
+                    // only one index + the files index, we need to delete both of the indexes
                     for (Index externalIndex : datasetIndexes) {
                         if (ExternalIndexingOperations.isFileIndex(externalIndex)) {
                             cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                                     externalIndex.getIndexName());
                             jobsToExecute.add(
                                     ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
-                            //#. mark PendingDropOp on the existing files index
+                            // #. mark PendingDropOp on the existing files index
                             MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
                                     externalIndex.getIndexName());
                             MetadataManager.INSTANCE.addIndex(mdTxnCtx,
@@ -1649,14 +1649,14 @@
                     }
                 }
 
-                //#. mark PendingDropOp on the existing index
+                // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                         new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
                                 index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
-                //#. commit the existing transaction before calling runJob.
+                // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1665,15 +1665,15 @@
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
 
-                //#. begin a new transaction
+                // #. begin a new transaction
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-                //#. finally, delete the existing index
+                // #. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 if (dropFilesIndex) {
-                    //delete the files index too
+                    // delete the files index too
                     MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName,
                             ExternalIndexingOperations.getFilesIndexName(datasetName));
                     MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
@@ -1688,18 +1688,18 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to be compensated.
+                    // do no throw exception since still the metadata needs to be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1953,11 +1953,11 @@
             ICompiledDmlStatement stmt)
             throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
-        //Query Rewriting (happens under the same ongoing metadata transaction)
+        // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
                 sessionConfig);
 
-        //Query Compilation (happens under the same ongoing metadata transaction)
+        // Query Compilation (happens under the same ongoing metadata transaction)
         JobSpecification spec = apiFramework.compileQuery(declaredFunctions, metadataProvider, reWrittenQuery.first,
                 reWrittenQuery.second, stmt == null ? null : stmt.getDatasetName(), sessionConfig, stmt);
 
@@ -2178,7 +2178,8 @@
 
             feedConnId = new FeedConnectionId(dataverseName, cfs.getFeedName(), cfs.getDatasetName().getValue());
 
-            if (FeedLifecycleListener.INSTANCE.isFeedConnectionActive(feedConnId)) {
+            subscriberRegistered = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(feedConnId, eventSubscriber);
+            if (subscriberRegistered) {
                 throw new AsterixException("Feed " + cfs.getFeedName() + " is already connected to dataset "
                         + cfs.getDatasetName().getValue());
             }
@@ -2186,14 +2187,13 @@
             FeedPolicyEntity feedPolicy = FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(),
                     mdTxnCtx);
 
-            //All Metadata checks have passed. Feed connect request is valid. //
+            // All Metadata checks have passed. Feed connect request is valid. //
 
             FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(feedPolicy.getProperties());
             Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = getFeedConnectionRequest(dataverseName,
                     feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
             FeedConnectionRequest connectionRequest = triple.first;
             boolean createFeedIntakeJob = triple.second;
-
             FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId, eventSubscriber);
             subscriberRegistered = true;
             if (createFeedIntakeJob) {
@@ -2202,8 +2202,8 @@
                         feedId.getFeedName());
                 Pair<JobSpecification, IAdapterFactory> pair = FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
                         metadataProvider, policyAccessor);
-                //adapter configuration are valid at this stage
-                //register the feed joints (these are auto-de-registered)
+                // adapter configuration are valid at this stage
+                // register the feed joints (these are auto-de-registered)
                 int numOfPrividers = pair.second.getPartitionConstraint().getLocations().length;
                 for (IFeedJoint fj : triple.third) {
                     FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, numOfPrividers);
@@ -2227,7 +2227,7 @@
             bActiveTxn = false;
             eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
             if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
-                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // blocking call
+                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED); // blocking call
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2270,7 +2270,7 @@
         boolean isFeedJointAvailable = FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
         if (!isFeedJointAvailable) {
             sourceFeedJoint = FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
-            if (sourceFeedJoint == null) { //the feed is currently not being ingested, i.e., it is unavailable.
+            if (sourceFeedJoint == null) { // the feed is currently not being ingested, i.e., it is unavailable.
                 connectionLocation = FeedRuntimeType.INTAKE;
                 FeedId sourceFeedId = feedJointKey.getFeedId(); // the root/primary feedId
                 Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverse, sourceFeedId.getFeedName());
@@ -2290,8 +2290,8 @@
                     functionsToApply.add(f);
                 }
             }
-            //register the compute feed point that represents the final output from the collection of
-            //functions that will be applied.
+            // register the compute feed point that represents the final output from the collection of
+            // functions that will be applied.
             if (!functionsToApply.isEmpty()) {
                 FeedJointKey computeFeedJointKey = new FeedJointKey(feed.getFeedId(), functionsToApply);
                 IFeedJoint computeFeedJoint = new FeedJoint(computeFeedJointKey, feed.getFeedId(),
@@ -2341,7 +2341,6 @@
         DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
         String dataverseName = getActiveDataverse(cfs.getDataverseName());
         String datasetName = cfs.getDatasetName().getValue();
-
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2350,7 +2349,9 @@
         Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
 
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
-        boolean isFeedConnectionActive = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId);
+        IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+        boolean isFeedConnectionActive = FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId,
+                eventSubscriber);
         if (!isFeedConnectionActive) {
             throw new AsterixException("Feed " + feed.getFeedId().getFeedName() + " is currently not connected to "
                     + cfs.getDatasetName().getValue() + ". Invalid operation!");
@@ -2377,7 +2378,7 @@
                 CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(connectionId);
                 FeedLifecycleListener.INSTANCE.reportPartialDisconnection(connectionId);
             }
-
+            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED);
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -2463,7 +2464,7 @@
             Datatype dt = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     ds.getItemTypeDataverseName(), itemTypeName);
 
-            //Prepare jobs to compact the datatset and its indexes
+            // Prepare jobs to compact the datatset and its indexes
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
             if (indexes.size() == 0) {
                 throw new AlgebricksException(
@@ -2500,7 +2501,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            //#. run the jobs
+            // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
                 JobUtils.runJob(hcc, jobSpec, true);
             }
@@ -2548,9 +2549,9 @@
                         ResultReader resultReader = new ResultReader(hcc, hdc);
                         resultReader.open(jobId, metadataProvider.getResultSetId());
 
-                        //In this case (the normal case), we don't use the
-                        //"response" JSONObject - just stream the results
-                        //to the "out" PrintWriter
+                        // In this case (the normal case), we don't use the
+                        // "response" JSONObject - just stream the results
+                        // to the "out" PrintWriter
                         if (sessionConfig.fmt() == OutputFormat.CSV
                                 && sessionConfig.is(SessionConfig.FORMAT_CSV_HEADER)) {
                             ResultUtils.displayCSVHeader(metadataProvider.findOutputRecordType(), sessionConfig);
@@ -2579,7 +2580,7 @@
             throw e;
         } finally {
             MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), query.getDatasets());
-            //release external datasets' locks acquired during compilation of the query
+            // release external datasets' locks acquired during compilation of the query
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
@@ -2640,56 +2641,56 @@
             ds = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
                     datasetName);
 
-            //Dataset exists ?
+            // Dataset exists ?
             if (ds == null) {
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName);
             }
-            //Dataset external ?
+            // Dataset external ?
             if (ds.getDatasetType() != DatasetType.EXTERNAL) {
                 throw new AlgebricksException(
                         "dataset " + datasetName + " in dataverse " + dataverseName + " is not an external dataset");
             }
-            //Dataset has indexes ?
+            // Dataset has indexes ?
             indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
             if (indexes.size() == 0) {
                 throw new AlgebricksException("External dataset " + datasetName + " in dataverse " + dataverseName
                         + " doesn't have any index");
             }
 
-            //Record transaction time
+            // Record transaction time
             Date txnTime = new Date();
 
-            //refresh lock here
+            // refresh lock here
             ExternalDatasetsRegistry.INSTANCE.refreshBegin(ds);
             lockAquired = true;
 
-            //Get internal files
+            // Get internal files
             metadataFiles = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, ds);
             deletedFiles = new ArrayList<ExternalFile>();
             addedFiles = new ArrayList<ExternalFile>();
             appendedFiles = new ArrayList<ExternalFile>();
 
-            //Compute delta
-            //Now we compare snapshot with external file system
+            // Compute delta
+            // Now we compare snapshot with external file system
             if (ExternalIndexingOperations.isDatasetUptodate(ds, metadataFiles, addedFiles, deletedFiles,
                     appendedFiles)) {
                 ((ExternalDatasetDetails) ds.getDatasetDetails()).setRefreshTimestamp(txnTime);
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                //latch will be released in the finally clause
+                // latch will be released in the finally clause
                 return;
             }
 
-            //At this point, we know data has changed in the external file system, record
-            //transaction in metadata and start
+            // At this point, we know data has changed in the external file system, record
+            // transaction in metadata and start
             transactionDataset = ExternalIndexingOperations.createTransactionDataset(ds);
             /*
              * Remove old dataset record and replace it with a new one
              */
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset);
 
-            //Add delta files to the metadata
+            // Add delta files to the metadata
             for (ExternalFile file : addedFiles) {
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
@@ -2700,7 +2701,7 @@
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
 
-            //Create the files index update job
+            // Create the files index update job
             spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, metadataFiles, deletedFiles, addedFiles,
                     appendedFiles, metadataProvider);
 
@@ -2708,22 +2709,22 @@
             bActiveTxn = false;
             transactionState = ExternalDatasetTransactionState.BEGIN;
 
-            //run the files update job
+            // run the files update job
             JobUtils.runJob(hcc, spec, true);
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
                     spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, index, metadataFiles, deletedFiles,
                             addedFiles, appendedFiles, metadataProvider);
-                    //run the files update job
+                    // run the files update job
                     JobUtils.runJob(hcc, spec, true);
                 }
             }
 
-            //all index updates has completed successfully, record transaction state
+            // all index updates has completed successfully, record transaction state
             spec = ExternalIndexingOperations.buildCommitJob(ds, indexes, metadataProvider);
 
-            //Aquire write latch again -> start a transaction and record the decision to commit
+            // Aquire write latch again -> start a transaction and record the decision to commit
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             bActiveTxn = true;
@@ -2734,9 +2735,9 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT;
-            //We don't release the latch since this job is expected to be quick
+            // We don't release the latch since this job is expected to be quick
             JobUtils.runJob(hcc, spec, true);
-            //Start a new metadata transaction to record the final state of the transaction
+            // Start a new metadata transaction to record the final state of the transaction
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             bActiveTxn = true;
@@ -2749,11 +2750,11 @@
                     while (iterator.hasNext()) {
                         ExternalFile appendedFile = iterator.next();
                         if (file.getFileName().equals(appendedFile.getFileName())) {
-                            //delete existing file
+                            // delete existing file
                             MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                            //delete existing appended file
+                            // delete existing appended file
                             MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, appendedFile);
-                            //add the original file with appended information
+                            // add the original file with appended information
                             appendedFile.setFileNumber(file.getFileNumber());
                             appendedFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
                             MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, appendedFile);
@@ -2763,24 +2764,24 @@
                 }
             }
 
-            //remove the deleted files delta
+            // remove the deleted files delta
             for (ExternalFile file : deletedFiles) {
                 MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
             }
 
-            //insert new files
+            // insert new files
             for (ExternalFile file : addedFiles) {
                 MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                 file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
 
-            //mark the transaction as complete
+            // mark the transaction as complete
             ((ExternalDatasetDetails) transactionDataset.getDatasetDetails())
                     .setState(ExternalDatasetTransactionState.COMMIT);
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, transactionDataset);
 
-            //commit metadata transaction
+            // commit metadata transaction
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             success = true;
         } catch (Exception e) {
@@ -2792,12 +2793,12 @@
                         + datasetName + ") refresh couldn't carry out the commit phase", e);
             }
             if (transactionState == ExternalDatasetTransactionState.COMMIT) {
-                //Nothing to do , everything should be clean
+                // Nothing to do , everything should be clean
                 throw e;
             }
             if (transactionState == ExternalDatasetTransactionState.BEGIN) {
-                //transaction failed, need to do the following
-                //clean NCs removing transaction components
+                // transaction failed, need to do the following
+                // clean NCs removing transaction components
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2807,12 +2808,12 @@
                 try {
                     JobUtils.runJob(hcc, spec, true);
                 } catch (Exception e2) {
-                    //This should never happen -- fix throw illegal
+                    // This should never happen -- fix throw illegal
                     e.addSuppressed(e2);
                     throw new IllegalStateException("System is in inconsistent state. Failed to abort refresh", e);
                 }
-                //remove the delta of files
-                //return the state of the dataset to committed
+                // remove the delta of files
+                // return the state of the dataset to committed
                 try {
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     for (ExternalFile file : deletedFiles) {
@@ -2825,7 +2826,7 @@
                         MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                     }
                     MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
-                    //commit metadata transaction
+                    // commit metadata transaction
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     abort(e, e2, mdTxnCtx);
@@ -2878,20 +2879,20 @@
                     datasetNameFrom, datasetNameTo, mdTxnCtx);
 
             String pregelixHomeKey = "PREGELIX_HOME";
-            //Finds PREGELIX_HOME in system environment variables.
+            // Finds PREGELIX_HOME in system environment variables.
             String pregelixHome = System.getenv(pregelixHomeKey);
-            //Finds PREGELIX_HOME in Java properties.
+            // Finds PREGELIX_HOME in Java properties.
             if (pregelixHome == null) {
                 pregelixHome = System.getProperty(pregelixHomeKey);
             }
-            //Finds PREGELIX_HOME in AsterixDB configuration.
+            // Finds PREGELIX_HOME in AsterixDB configuration.
             if (pregelixHome == null) {
-                //Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties,
-                //pregelixHome can never be null.
+                // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties,
+                // pregelixHome can never be null.
                 pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
             }
 
-            //Constructs the pregelix command line.
+            // Constructs the pregelix command line.
             List<String> cmd = constructPregelixCommand(pregelixStmt, dataverseNameFrom, datasetNameFrom,
                     dataverseNameTo, datasetNameTo);
             ProcessBuilder pb = new ProcessBuilder(cmd);
@@ -2900,9 +2901,9 @@
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            //Executes the Pregelix command.
+            // Executes the Pregelix command.
             int resultState = executeExternalShellProgram(pb);
-            //Checks the return state of the external Pregelix command.
+            // Checks the return state of the external Pregelix command.
             if (resultState != 0) {
                 throw new AlgebricksException(
                         "Something went wrong executing your Pregelix Job. Perhaps the Pregelix cluster needs to be restarted. "
@@ -2920,12 +2921,12 @@
         }
     }
 
-    //Prepares to run a program on external runtime.
+    // Prepares to run a program on external runtime.
     private void prepareRunExternalRuntime(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
             RunStatement pregelixStmt, String dataverseNameFrom, String dataverseNameTo, String datasetNameFrom,
             String datasetNameTo, MetadataTransactionContext mdTxnCtx)
             throws AlgebricksException, AsterixException, Exception {
-        //Validates the source/sink dataverses and datasets.
+        // Validates the source/sink dataverses and datasets.
         Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, datasetNameFrom);
         if (fromDataset == null) {
             throw new AsterixException("The source dataset " + datasetNameFrom + " in dataverse " + dataverseNameFrom
@@ -2938,7 +2939,7 @@
         }
 
         try {
-            //Find the primary index of the sink dataset.
+            // Find the primary index of the sink dataset.
             Index toIndex = null;
             List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
                     pregelixStmt.getDatasetNameTo().getValue());
@@ -2951,7 +2952,7 @@
             if (toIndex == null) {
                 throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo);
             }
-            //Cleans up the sink dataset -- Drop and then Create.
+            // Cleans up the sink dataset -- Drop and then Create.
             DropStatement dropStmt = new DropStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(),
                     true);
             this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
@@ -2970,12 +2971,12 @@
             throw new AlgebricksException("Error cleaning the result dataset. This should not happen.");
         }
 
-        //Flushes source dataset.
+        // Flushes source dataset.
         FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, dataverseNameFrom, datasetNameFrom,
                 datasetNameFrom);
     }
 
-    //Executes external shell commands.
+    // Executes external shell commands.
     private int executeExternalShellProgram(ProcessBuilder pb)
             throws IOException, AlgebricksException, InterruptedException {
         Process process = pb.start();
@@ -3001,15 +3002,15 @@
             }
             process.waitFor();
         }
-        //Gets the exit value of the program.
+        // Gets the exit value of the program.
         int resultState = process.exitValue();
         return resultState;
     }
 
-    //Constructs a Pregelix command line.
+    // Constructs a Pregelix command line.
     private List<String> constructPregelixCommand(RunStatement pregelixStmt, String fromDataverseName,
             String fromDatasetName, String toDataverseName, String toDatasetName) {
-        //Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
+        // Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
         AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
         AsterixClusterProperties clusterProperties = AsterixClusterProperties.INSTANCE;
         String clientIP = clusterProperties.getCluster().getMasterNode().getClientIp();
@@ -3024,7 +3025,7 @@
         asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataset=" + toDatasetName + ",");
         asterixdbParameterBuilder.append("pregelix.asterixdb.output.cleanup=false,");
 
-        //construct command
+        // construct command
         List<String> cmds = new ArrayList<String>();
         cmds.add("bin/pregelix");
         cmds.add(pregelixStmt.getParameters().get(0)); // jar
@@ -3037,7 +3038,7 @@
         String outputConverterClassValue = "=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter,";
         boolean custPropAdded = false;
         boolean meetCustProp = false;
-        //User parameters.
+        // User parameters.
         for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
             if (meetCustProp) {
                 if (!s.contains(inputConverterClassKey)) {
@@ -3059,10 +3060,10 @@
 
         if (!custPropAdded) {
             cmds.add(customizedPregelixProperty);
-            //Appends default converter classes to asterixdbParameterBuilder.
+            // Appends default converter classes to asterixdbParameterBuilder.
             asterixdbParameterBuilder.append(inputConverterClassKey + inputConverterClassValue);
             asterixdbParameterBuilder.append(outputConverterClassKey + outputConverterClassValue);
-            //Remove the last comma.
+            // Remove the last comma.
             asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1,
                     asterixdbParameterBuilder.length());
             cmds.add(asterixdbParameterBuilder.toString());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 2378032..9dd4025 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -37,7 +37,7 @@
 import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -91,8 +91,8 @@
     }
 
     private void handleFeedProviderReady(IMessage message) {
-        FeedProviderReadyMessage msg = (FeedProviderReadyMessage) message;
-        FeedLifecycleListener.INSTANCE.notifyProviderReady(msg.getFeedId(), msg.getJobId());
+        FeedPartitionStartMessage msg = (FeedPartitionStartMessage) message;
+        FeedLifecycleListener.INSTANCE.notifyPartitionStart(msg.getFeedId(), msg.getJobId());
     }
 
     private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
index 7b74ef9..0dd87d6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
@@ -22,29 +22,19 @@
 
 public class FeedOperationCounter {
     private FeedJobInfo feedJobInfo;
-    private int providersCount;
-    private int jobsCount;
+    private int partitionCount;
     private boolean failedIngestion = false;
 
-    public FeedOperationCounter(int providersCount, int jobsCount) {
-        this.providersCount = providersCount;
-        this.jobsCount = jobsCount;
+    public FeedOperationCounter(int partitionCount) {
+        this.partitionCount = partitionCount;
     }
 
-    public int getProvidersCount() {
-        return providersCount;
+    public int getPartitionCount() {
+        return partitionCount;
     }
 
-    public void setProvidersCount(int providersCount) {
-        this.providersCount = providersCount;
-    }
-
-    public int getJobsCount() {
-        return jobsCount;
-    }
-
-    public void setJobsCount(int jobsCount) {
-        this.jobsCount = jobsCount;
+    public void setPartitionCount(int partitionCount) {
+        this.partitionCount = partitionCount;
     }
 
     public boolean isFailedIngestion() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
index 0c8724e..ad3c1c9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
@@ -27,7 +27,8 @@
         FEED_COLLECT_STARTED,
         FEED_INTAKE_FAILURE,
         FEED_COLLECT_FAILURE,
-        FEED_ENDED
+        FEED_INTAKE_ENDED,
+        FEED_COLLECT_ENDED
     }
 
     public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
index 28b713e..448ea47 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
@@ -45,6 +45,6 @@
 
     public List<String> getCollectLocations(FeedConnectionId feedConnectionId);
 
-    boolean isFeedConnectionActive(FeedConnectionId connectionId);
+    boolean isFeedConnectionActive(FeedConnectionId connectionId, IFeedLifecycleEventSubscriber eventSubscriber);
 
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
similarity index 90%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
index 4c81c5b..49b23ed 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
@@ -22,13 +22,13 @@
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.hyracks.api.job.JobId;
 
-public class FeedProviderReadyMessage extends AbstractApplicationMessage {
+public class FeedPartitionStartMessage extends AbstractApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private final FeedId feedId;
     private final JobId jobId;
 
-    public FeedProviderReadyMessage(FeedId feedId, JobId jobId) {
+    public FeedPartitionStartMessage(FeedId feedId, JobId jobId) {
         this.feedId = feedId;
         this.jobId = jobId;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
index 3e42169..b69a7b3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
@@ -36,6 +36,7 @@
     private List<String> collectLocations;
     private List<String> computeLocations;
     private List<String> storageLocations;
+    private int partitionStarts = 0;
 
     public FeedConnectJobInfo(JobId jobId, FeedJobState state, FeedConnectionId connectionId,
             IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, JobSpecification spec,
@@ -91,4 +92,12 @@
         this.computeFeedJoint = computeFeedJoint;
     }
 
+    public void partitionStart() {
+        partitionStarts++;
+    }
+
+    public boolean collectionStarted() {
+        return partitionStarts == collectLocations.size();
+    }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 87e1edb..178d2d5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
@@ -85,6 +86,10 @@
             switch (((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType()) {
                 case INTAKE:
                     handleCompleteConnection();
+                    // Notify CC that Collection started
+                    ctx.sendApplicationMessageToCC(
+                            new FeedPartitionStartMessage(connectionId.getFeedId(), ctx.getJobletContext().getJobId()),
+                            null);
                     break;
                 case COMPUTE:
                     handlePartialConnection();
@@ -93,7 +98,6 @@
                     throw new IllegalStateException("Invalid source type "
                             + ((SubscribableFeedRuntimeId) sourceRuntime.getRuntimeId()).getFeedRuntimeType());
             }
-
             State state = collectRuntime.waitTillCollectionOver();
             if (state.equals(State.FINISHED)) {
                 feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 6771010..cd20900 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -36,7 +36,7 @@
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
@@ -117,7 +117,7 @@
                         adapterRuntimeManager, ctx);
                 feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
                 // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests.
-                ctx.sendApplicationMessageToCC(new FeedProviderReadyMessage(feedId, ctx.getJobletContext().getJobId()),
+                ctx.sendApplicationMessageToCC(new FeedPartitionStartMessage(feedId, ctx.getJobletContext().getJobId()),
                         null);
                 feedFrameWriter.open();
             } else {