Add Active Partition Event Message

Enable active runtimes to send messages to the listener. In addition
this change introduces extension locks to metadata lock manager.

Change-Id: I7b4629752e912614927b816d4ce3422ac89c5426
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1596
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index e4a57e6..3c7aa06 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -31,6 +31,7 @@
 
     public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
     public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
+    public static final byte GENERIC_EVENT = 0x02;
     private static final long serialVersionUID = 1L;
     private final ActiveRuntimeId activeRuntimeId;
     private final JobId jobId;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e64cf14..bd5c214 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -208,9 +208,8 @@
     protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
 
-    public QueryTranslator(List<Statement> statements, SessionConfig conf,
-            ILangCompilationProvider compliationProvider, IStorageComponentProvider componentProvider,
-            ExecutorService executorService) {
+    public QueryTranslator(List<Statement> statements, SessionConfig conf, ILangCompilationProvider compliationProvider,
+            IStorageComponentProvider componentProvider, ExecutorService executorService) {
         this.statements = statements;
         this.sessionConfig = conf;
         this.componentProvider = componentProvider;
@@ -457,8 +456,9 @@
         }
     }
 
-    protected void validateCompactionPolicy(String compactionPolicy, Map<String, String> compactionPolicyProperties,
-            MetadataTransactionContext mdTxnCtx, boolean isExternalDataset) throws CompilationException, Exception {
+    protected static void validateCompactionPolicy(String compactionPolicy,
+            Map<String, String> compactionPolicyProperties, MetadataTransactionContext mdTxnCtx,
+            boolean isExternalDataset) throws CompilationException, Exception {
         CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
                 MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
         if (compactionPolicyEntity == null) {
@@ -534,8 +534,8 @@
             if (dt == null) {
                 throw new AlgebricksException(": type " + itemTypeName + " could not be found.");
             }
-            String ngName =
-                    ngNameId != null ? ngNameId.getValue() : configureNodegroupForDataset(dd, dataverseName, mdTxnCtx);
+            String ngName = ngNameId != null ? ngNameId.getValue()
+                    : configureNodegroupForDataset(dataverseName, datasetName, dd.getHints(), mdTxnCtx);
 
             if (compactionPolicy == null) {
                 compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -585,8 +585,7 @@
                     break;
                 case EXTERNAL:
                     String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-                    Map<String, String> properties =
-                            ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
 
                     datasetDetails =
                             new ExternalDatasetDetails(adapter, properties, new Date(), TransactionState.COMMIT);
@@ -711,22 +710,22 @@
         }
     }
 
-    protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse,
-            MetadataTransactionContext mdTxnCtx) throws CompilationException {
+    protected static String configureNodegroupForDataset(String dataverseName, String datasetName,
+            Map<String, String> hints, MetadataTransactionContext mdTxnCtx) throws CompilationException {
         int nodegroupCardinality;
         String nodegroupName;
-        String hintValue = dd.getHints().get(DatasetNodegroupCardinalityHint.NAME);
+        String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME);
         if (hintValue == null) {
             nodegroupName = MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME;
             return nodegroupName;
         } else {
             int numChosen = 0;
             boolean valid = DatasetHints.validate(DatasetNodegroupCardinalityHint.NAME,
-                    dd.getHints().get(DatasetNodegroupCardinalityHint.NAME)).first;
+                    hints.get(DatasetNodegroupCardinalityHint.NAME)).first;
             if (!valid) {
                 throw new CompilationException("Incorrect use of hint:" + DatasetNodegroupCardinalityHint.NAME);
             } else {
-                nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
+                nodegroupCardinality = Integer.parseInt(hints.get(DatasetNodegroupCardinalityHint.NAME));
             }
             List<String> nodeNames = AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames();
             List<String> nodeNamesClone = new ArrayList<>(nodeNames);
@@ -753,7 +752,7 @@
                     b[selected] = temp;
                 }
             }
-            nodegroupName = dataverse + ":" + dd.getName().getValue();
+            nodegroupName = dataverseName + ":" + datasetName;
             MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new NodeGroup(nodegroupName, selectedNodes));
             return nodegroupName;
         }
@@ -921,11 +920,10 @@
                     // Get snapshot from External File System
                     externalFilesSnapshot = ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
                     // Add an entry for the files index
-                    filesIndex =
-                            new Index(dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName),
-                                    IndexType.BTREE, ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
-                                    ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
-                                    MetadataUtil.PENDING_ADD_OP);
+                    filesIndex = new Index(dataverseName, datasetName, IndexingConstants.getFilesIndexName(datasetName),
+                            IndexType.BTREE, ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, null,
+                            ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, false, false,
+                            MetadataUtil.PENDING_ADD_OP);
                     MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
                     // Add files to the external files index
                     for (ExternalFile file : externalFilesSnapshot) {
@@ -960,8 +958,8 @@
 
             // #. add a new index with PendingAddOp
             index = new Index(dataverseName, datasetName, indexName, stmtCreateIndex.getIndexType(), indexFields,
-                    keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(),
-                    stmtCreateIndex.isEnforced(), false, MetadataUtil.PENDING_ADD_OP);
+                    keySourceIndicators, indexFieldTypes, stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(),
+                    false, MetadataUtil.PENDING_ADD_OP);
             MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), index);
 
             ARecordType enforcedType = null;
@@ -1013,8 +1011,8 @@
             // add another new files index with PendingNoOp after deleting the index with
             // PendingAddOp
             if (firstExternalDatasetIndex) {
-                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                        datasetName, filesIndex.getIndexName());
+                MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+                        filesIndex.getIndexName());
                 filesIndex.setPendingOp(MetadataUtil.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), filesIndex);
                 // update transaction timestamp
@@ -1186,8 +1184,8 @@
                     stopFeedBeforeDelete(new Pair<>(dvId, new Identifier(activeEntityId.getEntityName())),
                             metadataProvider);
                     // prepare job to remove feed log storage
-                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE
-                            .getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
+                    jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
+                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, dataverseName, activeEntityId.getEntityName())));
                 }
             }
 
@@ -1201,8 +1199,8 @@
                             MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if (indexes.get(k).isSecondaryIndex()) {
-                            jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k),
-                                    metadataProvider, datasets.get(j)));
+                            jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), metadataProvider,
+                                    datasets.get(j)));
                         }
                     }
                     Index primaryIndex =
@@ -1217,8 +1215,8 @@
                             jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
                                     datasets.get(j)));
                         } else {
-                            jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k),
-                                    metadataProvider, datasets.get(j)));
+                            jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(k), metadataProvider,
+                                    datasets.get(j)));
                         }
                     }
                     ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(datasets.get(j));
@@ -1311,6 +1309,11 @@
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
         String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
         String datasetName = stmtDelete.getDatasetName().getValue();
+        doDropDataset(dataverseName, datasetName, metadataProvider, stmtDelete.getIfExists(), hcc);
+    }
+
+    public static void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider,
+            boolean ifExists, IHyracksClientConnection hcc) throws Exception {
         MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
         MutableObject<MetadataTransactionContext> mdTxnCtx =
                 new MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
@@ -1321,12 +1324,12 @@
         try {
             Dataset ds = MetadataManager.INSTANCE.getDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
             if (ds == null) {
-                if (stmtDelete.getIfExists()) {
+                if (ifExists) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                     return;
                 } else {
-                    throw new AlgebricksException("There is no dataset with this name " + datasetName
-                            + " in dataverse " + dataverseName + ".");
+                    throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                            + dataverseName + ".");
                 }
             }
             ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, progress, hcc);
@@ -1362,7 +1365,6 @@
                             + "." + datasetName + ") couldn't be removed from the metadata", e);
                 }
             }
-
             throw e;
         } finally {
             MetadataLockManager.INSTANCE.dropDatasetEnd(dataverseName, dataverseName + "." + datasetName);
@@ -1370,104 +1372,6 @@
         }
     }
 
-    protected void doDropDataset(Dataset ds, String datasetName, MetadataProvider metadataProvider,
-            MutableObject<MetadataTransactionContext> mdTxnCtx, List<JobSpecification> jobsToExecute,
-            String dataverseName, MutableBoolean bActiveTxn, MutableObject<ProgressState> progress,
-            IHyracksClientConnection hcc) throws Exception {
-        Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>();
-        if (ds.getDatasetType() == DatasetType.INTERNAL) {
-            // prepare job spec(s) that would disconnect any active feeds involving the dataset.
-            IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
-            for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityUsingDataset(ds)) {
-                    throw new CompilationException(
-                            "Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
-                }
-            }
-
-            // #. prepare jobs to drop the datatset and the indexes in NC
-            List<Index> indexes =
-                    MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName);
-            for (int j = 0; j < indexes.size(); j++) {
-                if (indexes.get(j).isSecondaryIndex()) {
-                    jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), metadataProvider, ds));
-                }
-            }
-            Index primaryIndex =
-                    MetadataManager.INSTANCE.getIndex(mdTxnCtx.getValue(), dataverseName, datasetName, datasetName);
-            jobsToExecute.add(DatasetUtil.dropDatasetJobSpec(ds, primaryIndex, metadataProvider));
-            // #. mark the existing dataset as PendingDropOp
-            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
-            MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
-                    new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
-                            ds.getMetaItemTypeDataverseName(), ds.getMetaItemTypeName(), ds.getNodeGroupName(),
-                            ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(), ds.getDatasetDetails(),
-                            ds.getHints(), ds.getDatasetType(), ds.getDatasetId(), MetadataUtil.PENDING_DROP_OP));
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-            bActiveTxn.setValue(false);
-            progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
-
-            // # disconnect the feeds
-            for (Pair<JobSpecification, Boolean> p : disconnectJobList.values()) {
-                JobUtils.runJob(hcc, p.first, true);
-            }
-
-            // #. run the jobs
-            for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
-            }
-
-            mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
-            bActiveTxn.setValue(true);
-            metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
-        } else {
-            // External dataset
-            ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-            // #. prepare jobs to drop the datatset and the indexes in NC
-            List<Index> indexes =
-                    MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx.getValue(), dataverseName, datasetName);
-            for (int j = 0; j < indexes.size(); j++) {
-                if (ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
-                    jobsToExecute.add(IndexUtil.buildDropSecondaryIndexJobSpec(indexes.get(j), metadataProvider, ds));
-                } else {
-                    jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds));
-                }
-            }
-
-            // #. mark the existing dataset as PendingDropOp
-            MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
-            MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(),
-                    new Dataset(dataverseName, datasetName, ds.getItemTypeDataverseName(), ds.getItemTypeName(),
-                            ds.getNodeGroupName(), ds.getCompactionPolicy(), ds.getCompactionPolicyProperties(),
-                            ds.getDatasetDetails(), ds.getHints(), ds.getDatasetType(), ds.getDatasetId(),
-                            MetadataUtil.PENDING_DROP_OP));
-
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
-            bActiveTxn.setValue(false);
-            progress.setValue(ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA);
-
-            // #. run the jobs
-            for (JobSpecification jobSpec : jobsToExecute) {
-                JobUtils.runJob(hcc, jobSpec, true);
-            }
-            if (!indexes.isEmpty()) {
-                ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-            }
-            mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
-            bActiveTxn.setValue(true);
-            metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
-        }
-
-        // #. finally, delete the dataset.
-        MetadataManager.INSTANCE.dropDataset(mdTxnCtx.getValue(), dataverseName, datasetName);
-        // Drop the associated nodegroup
-        String nodegroup = ds.getNodeGroupName();
-        if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
-            MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), dataverseName + ":" + datasetName);
-        }
-    }
-
     protected void handleIndexDropStatement(MetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
@@ -1524,10 +1428,9 @@
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(),
-                                index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                MetadataUtil.PENDING_DROP_OP));
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
+                                index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
+                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1586,10 +1489,9 @@
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(), index.getKeyFieldSourceIndicators(),
-                                index.getKeyFieldTypes(), index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                MetadataUtil.PENDING_DROP_OP));
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
+                                index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
+                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(), MetadataUtil.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1648,8 +1550,8 @@
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName
-                            + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
+                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "."
+                            + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
                 }
             }
 
@@ -1783,8 +1685,7 @@
             CompiledLoadFromFileStatement cls =
                     new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(),
                             loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted());
-            JobSpecification spec =
-                    apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls);
+            JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionConfig, cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
@@ -1989,8 +1890,8 @@
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE
-                    .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy);
+            FeedPolicyEntity feedPolicy =
+                    MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy);
             if (feedPolicy != null) {
                 if (cfps.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2002,8 +1903,8 @@
             boolean extendingExisting = cfps.getSourcePolicyName() != null;
             String description = cfps.getDescription() == null ? "" : cfps.getDescription();
             if (extendingExisting) {
-                FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(
-                        metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
+                FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE
+                        .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
                     sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
                             MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
@@ -2135,8 +2036,7 @@
         }
         // Start
         try {
-            MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName,
-                    feedConnections);
+            MetadataLockManager.INSTANCE.startFeedBegin(dataverseName, dataverseName + "." + feedName, feedConnections);
             // Prepare policy
             List<IDataset> datasets = new ArrayList<>();
             for (FeedConnection connection : feedConnections) {
@@ -2761,8 +2661,8 @@
                 handlePregelixStatement(metadataProvider, runStmt, hcc);
                 break;
             default:
-                throw new AlgebricksException("The system \"" + runStmt.getSystem()
-                        + "\" specified in your run statement is not supported.");
+                throw new AlgebricksException(
+                        "The system \"" + runStmt.getSystem() + "\" specified in your run statement is not supported.");
         }
 
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 55de9c7..a73a5cc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -302,7 +302,7 @@
 
     private IStatementExecutorFactory getStatementExecutorFactory() {
         return ccExtensionManager.getStatementExecutorFactory(
-                ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutorService());
+                ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor());
     }
 
     @Override
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index ad75b6b..cd8cf3b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -609,4 +609,8 @@
     public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
         return NoOpFrameOperationCallbackFactory.INSTANCE;
     }
+
+    public boolean isTemp() {
+        return getDatasetDetails().isTemp();
+    }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
index 81eaa9b..9292008 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockManager.java
@@ -22,14 +22,16 @@
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 
 import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedConnection;
 
 public class MetadataLockManager {
 
     public static final MetadataLockManager INSTANCE = new MetadataLockManager();
+    private static final Function<String, ReentrantReadWriteLock> LOCK_FUNCTION = key -> new ReentrantReadWriteLock();
+    private static final Function<String, DatasetLock> DATASET_LOCK_FUNCTION = key -> new DatasetLock();
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataversesLocks;
     private final ConcurrentHashMap<String, DatasetLock> datasetsLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> functionsLocks;
@@ -38,6 +40,7 @@
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> feedPolicyLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> compactionPolicyLocks;
     private final ConcurrentHashMap<String, ReentrantReadWriteLock> dataTypeLocks;
+    private final ConcurrentHashMap<String, ReentrantReadWriteLock> extensionLocks;
 
     private MetadataLockManager() {
         dataversesLocks = new ConcurrentHashMap<>();
@@ -48,14 +51,11 @@
         feedPolicyLocks = new ConcurrentHashMap<>();
         compactionPolicyLocks = new ConcurrentHashMap<>();
         dataTypeLocks = new ConcurrentHashMap<>();
+        extensionLocks = new ConcurrentHashMap<>();
     }
 
     public void acquireDataverseReadLock(String dataverseName) {
-        ReentrantReadWriteLock dvLock = dataversesLocks.get(dataverseName);
-        if (dvLock == null) {
-            dataversesLocks.putIfAbsent(dataverseName, new ReentrantReadWriteLock());
-            dvLock = dataversesLocks.get(dataverseName);
-        }
+        ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
         dvLock.readLock().lock();
     }
 
@@ -64,11 +64,7 @@
     }
 
     public void acquireDataverseWriteLock(String dataverseName) {
-        ReentrantReadWriteLock dvLock = dataversesLocks.get(dataverseName);
-        if (dvLock == null) {
-            dataversesLocks.putIfAbsent(dataverseName, new ReentrantReadWriteLock());
-            dvLock = dataversesLocks.get(dataverseName);
-        }
+        ReentrantReadWriteLock dvLock = dataversesLocks.computeIfAbsent(dataverseName, LOCK_FUNCTION);
         dvLock.writeLock().lock();
     }
 
@@ -77,11 +73,7 @@
     }
 
     public void acquireDatasetReadLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        if (dsLock == null) {
-            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
-            dsLock = datasetsLocks.get(datasetName);
-        }
+        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
         dsLock.acquireReadLock();
     }
 
@@ -90,11 +82,7 @@
     }
 
     public void acquireDatasetWriteLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        if (dsLock == null) {
-            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
-            dsLock = datasetsLocks.get(datasetName);
-        }
+        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
         dsLock.acquireWriteLock();
     }
 
@@ -103,11 +91,7 @@
     }
 
     public void acquireDatasetModifyLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        if (dsLock == null) {
-            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
-            dsLock = datasetsLocks.get(datasetName);
-        }
+        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
         dsLock.acquireReadLock();
         dsLock.acquireReadModifyLock();
     }
@@ -119,11 +103,7 @@
     }
 
     public void acquireDatasetCreateIndexLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        if (dsLock == null) {
-            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
-            dsLock = datasetsLocks.get(datasetName);
-        }
+        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
         dsLock.acquireReadLock();
         dsLock.acquireWriteModifyLock();
     }
@@ -135,11 +115,7 @@
     }
 
     public void acquireExternalDatasetRefreshLock(String datasetName) {
-        DatasetLock dsLock = datasetsLocks.get(datasetName);
-        if (dsLock == null) {
-            datasetsLocks.putIfAbsent(datasetName, new DatasetLock());
-            dsLock = datasetsLocks.get(datasetName);
-        }
+        DatasetLock dsLock = datasetsLocks.computeIfAbsent(datasetName, DATASET_LOCK_FUNCTION);
         dsLock.acquireReadLock();
         dsLock.acquireRefreshLock();
     }
@@ -151,11 +127,7 @@
     }
 
     public void acquireFunctionReadLock(String functionName) {
-        ReentrantReadWriteLock fLock = functionsLocks.get(functionName);
-        if (fLock == null) {
-            functionsLocks.putIfAbsent(functionName, new ReentrantReadWriteLock());
-            fLock = functionsLocks.get(functionName);
-        }
+        ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
         fLock.readLock().lock();
     }
 
@@ -164,11 +136,7 @@
     }
 
     public void acquireFunctionWriteLock(String functionName) {
-        ReentrantReadWriteLock fLock = functionsLocks.get(functionName);
-        if (fLock == null) {
-            functionsLocks.putIfAbsent(functionName, new ReentrantReadWriteLock());
-            fLock = functionsLocks.get(functionName);
-        }
+        ReentrantReadWriteLock fLock = functionsLocks.computeIfAbsent(functionName, LOCK_FUNCTION);
         fLock.writeLock().lock();
     }
 
@@ -177,11 +145,7 @@
     }
 
     public void acquireNodeGroupReadLock(String nodeGroupName) {
-        ReentrantReadWriteLock ngLock = nodeGroupsLocks.get(nodeGroupName);
-        if (ngLock == null) {
-            nodeGroupsLocks.putIfAbsent(nodeGroupName, new ReentrantReadWriteLock());
-            ngLock = nodeGroupsLocks.get(nodeGroupName);
-        }
+        ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
         ngLock.readLock().lock();
     }
 
@@ -190,11 +154,7 @@
     }
 
     public void acquireNodeGroupWriteLock(String nodeGroupName) {
-        ReentrantReadWriteLock ngLock = nodeGroupsLocks.get(nodeGroupName);
-        if (ngLock == null) {
-            nodeGroupsLocks.putIfAbsent(nodeGroupName, new ReentrantReadWriteLock());
-            ngLock = nodeGroupsLocks.get(nodeGroupName);
-        }
+        ReentrantReadWriteLock ngLock = nodeGroupsLocks.computeIfAbsent(nodeGroupName, LOCK_FUNCTION);
         ngLock.writeLock().lock();
     }
 
@@ -203,11 +163,7 @@
     }
 
     public void acquireFeedReadLock(String feedName) {
-        ReentrantReadWriteLock fLock = feedsLocks.get(feedName);
-        if (fLock == null) {
-            feedsLocks.putIfAbsent(feedName, new ReentrantReadWriteLock());
-            fLock = feedsLocks.get(feedName);
-        }
+        ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
         fLock.readLock().lock();
     }
 
@@ -216,11 +172,7 @@
     }
 
     public void acquireFeedWriteLock(String feedName) {
-        ReentrantReadWriteLock fLock = feedsLocks.get(feedName);
-        if (fLock == null) {
-            feedsLocks.putIfAbsent(feedName, new ReentrantReadWriteLock());
-            fLock = feedsLocks.get(feedName);
-        }
+        ReentrantReadWriteLock fLock = feedsLocks.computeIfAbsent(feedName, LOCK_FUNCTION);
         fLock.writeLock().lock();
     }
 
@@ -229,11 +181,7 @@
     }
 
     public void acquireFeedPolicyWriteLock(String policyName) {
-        ReentrantReadWriteLock fLock = feedPolicyLocks.get(policyName);
-        if (fLock == null) {
-            feedPolicyLocks.putIfAbsent(policyName, new ReentrantReadWriteLock());
-            fLock = feedPolicyLocks.get(policyName);
-        }
+        ReentrantReadWriteLock fLock = feedPolicyLocks.computeIfAbsent(policyName, LOCK_FUNCTION);
         fLock.writeLock().lock();
     }
 
@@ -242,11 +190,8 @@
     }
 
     public void acquireCompactionPolicyReadLock(String compactionPolicyName) {
-        ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
-        if (compactionPolicyLock == null) {
-            compactionPolicyLocks.putIfAbsent(compactionPolicyName, new ReentrantReadWriteLock());
-            compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
-        }
+        ReentrantReadWriteLock compactionPolicyLock =
+                compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
         compactionPolicyLock.readLock().lock();
     }
 
@@ -255,11 +200,8 @@
     }
 
     public void acquireCompactionPolicyWriteLock(String compactionPolicyName) {
-        ReentrantReadWriteLock compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
-        if (compactionPolicyLock == null) {
-            compactionPolicyLocks.putIfAbsent(compactionPolicyName, new ReentrantReadWriteLock());
-            compactionPolicyLock = compactionPolicyLocks.get(compactionPolicyName);
-        }
+        ReentrantReadWriteLock compactionPolicyLock =
+                compactionPolicyLocks.computeIfAbsent(compactionPolicyName, LOCK_FUNCTION);
         compactionPolicyLock.writeLock().lock();
     }
 
@@ -268,11 +210,7 @@
     }
 
     public void acquireDataTypeReadLock(String dataTypeName) {
-        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.get(dataTypeName);
-        if (dataTypeLock == null) {
-            dataTypeLocks.putIfAbsent(dataTypeName, new ReentrantReadWriteLock());
-            dataTypeLock = dataTypeLocks.get(dataTypeName);
-        }
+        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION);
         dataTypeLock.readLock().lock();
     }
 
@@ -281,11 +219,7 @@
     }
 
     public void acquireDataTypeWriteLock(String dataTypeName) {
-        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.get(dataTypeName);
-        if (dataTypeLock == null) {
-            dataTypeLocks.putIfAbsent(dataTypeName, new ReentrantReadWriteLock());
-            dataTypeLock = dataTypeLocks.get(dataTypeName);
-        }
+        ReentrantReadWriteLock dataTypeLock = dataTypeLocks.computeIfAbsent(dataTypeName, LOCK_FUNCTION);
         dataTypeLock.writeLock().lock();
     }
 
@@ -410,8 +344,8 @@
         releaseDataverseReadLock(dataverseName);
     }
 
-    public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName,
-            List<String> dataverses, List<String> datasets) {
+    public void insertDeleteUpsertBegin(String dataverseName, String datasetFullyQualifiedName, List<String> dataverses,
+            List<String> datasets) {
         dataverses.add(dataverseName);
         datasets.add(datasetFullyQualifiedName);
         Collections.sort(dataverses);
@@ -632,4 +566,22 @@
         releaseExternalDatasetRefreshLock(datasetFullyQualifiedName);
         releaseDataverseReadLock(dataverseName);
     }
+
+    public void acquireExtensionReadLock(String entityName) {
+        ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
+        entityLock.readLock().lock();
+    }
+
+    public void releaseExtensionReadLock(String entityName) {
+        extensionLocks.get(entityName).readLock().unlock();
+    }
+
+    public void acquireExtensionWriteLock(String entityName) {
+        ReentrantReadWriteLock entityLock = extensionLocks.computeIfAbsent(entityName, LOCK_FUNCTION);
+        entityLock.writeLock().lock();
+    }
+
+    public void releaseExtensionWriteLock(String entityName) {
+        extensionLocks.get(entityName).writeLock().unlock();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
index aecc643..540de3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.api.service;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.hyracks.api.application.IServiceContext;
 
 public interface IControllerService {
@@ -27,5 +29,7 @@
 
     IServiceContext getContext();
 
+    ExecutorService getExecutor();
+
     Object getApplicationContext();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 0835551..c47a612 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -34,7 +34,6 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TreeMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
@@ -325,14 +324,11 @@
         return workQueue;
     }
 
-    public ExecutorService getExecutorService() {
+    @Override
+    public ExecutorService getExecutor() {
         return executor;
     }
 
-    public Executor getExecutor() {
-        return getExecutorService();
-    }
-
     public CCConfig getConfig() {
         return ccConfig;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 46aa992..d7b4be0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -217,7 +217,7 @@
             LOGGER.warning("Freeing leaked " + stillAllocated + " bytes");
             serviceCtx.getMemoryManager().deallocate(stillAllocated);
         }
-        nodeController.getExecutorService().execute(new Runnable() {
+        nodeController.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
                 deallocatableRegistry.close();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index e6278be..c416942 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -118,12 +118,12 @@
 
             case SHUTDOWN_REQUEST:
                 final CCNCFunctions.ShutdownRequestFunction sdrf = (CCNCFunctions.ShutdownRequestFunction) fn;
-                ncs.getExecutorService().submit(new ShutdownTask(ncs, sdrf.isTerminateNCService()));
+                ncs.getExecutor().submit(new ShutdownTask(ncs, sdrf.isTerminateNCService()));
                 return;
 
             case THREAD_DUMP_REQUEST:
                 final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
-                ncs.getExecutorService().submit(new ThreadDumpTask(ncs, tdrf.getRequestId()));
+                ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId()));
                 return;
 
             default:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 14d221e..9c28c16 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -416,7 +416,8 @@
         return nodeParameters;
     }
 
-    public ExecutorService getExecutorService() {
+    @Override
+    public ExecutorService getExecutor() {
         return executor;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 8f68e76..670ce06 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -53,7 +53,7 @@
         }
         final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
         ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
-        ncs.getExecutorService().execute(new Runnable() {
+        ncs.getExecutor().execute(new Runnable() {
             @Override
             public void run() {
                 for (IPartition p : unregisteredPartitions) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 692eac6..02e8051 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -131,7 +131,7 @@
                 }
                 final int partition = tid.getPartition();
                 List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
-                task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs,
+                task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
                         createInputChannels(td, inputs));
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
 
@@ -207,7 +207,7 @@
                 .getInputPartitionCounts()[i], td.getPartitionCount());
         if (cPolicy.materializeOnReceiveSide()) {
             return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector, task
-                    .getTaskAttemptId(), ncs.getExecutorService());
+                    .getTaskAttemptId(), ncs.getExecutor());
         } else {
             return collector;
         }
@@ -223,7 +223,7 @@
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
                         return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
-                                conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutorService());
+                                conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
                     }
                 };
             } else {
@@ -233,7 +233,7 @@
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
                         return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
                                 jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs
-                                        .getExecutorService());
+                                        .getExecutor());
                     }
                 };
             }