Refactor Messaging

In this refactoring, each message implementation includes
a handle method. This avoids bloating of message brokers
and enable better extensibility for messaging.

Change-Id: I7c918bf504058c98ecf89f5b019503278e9aa01f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1128
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
similarity index 94%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
rename to asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
index fe15ce8..06e9ad1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ActiveLifecycleListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.app.external;
+package org.apache.asterix.active;
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -24,9 +24,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveEvent;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index a6e1788..50fa257 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -20,7 +20,12 @@
 
 import java.io.Serializable;
 
+import org.apache.asterix.active.ActiveManager;
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
 
 public class ActiveManagerMessage extends AbstractApplicationMessage {
     public static final byte STOP_ACTIVITY = 0x00;
@@ -36,11 +41,6 @@
         this.payload = payload;
     }
 
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.ACTIVE_MANAGER_MESSAGE;
-    }
-
     public Serializable getPayload() {
         return payload;
     }
@@ -52,4 +52,17 @@
     public String getSrc() {
         return src;
     }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        ((ActiveManager) appContext.getActiveManager()).submit(this);
+    }
+
+    @Override
+    public String type() {
+        return "ACTIVE_MANAGER_MESSAGE";
+    }
 }
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index f5bdf39..02affc4 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -20,9 +20,12 @@
 
 import java.io.Serializable;
 
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.service.IControllerService;
 
 public class ActivePartitionMessage extends AbstractApplicationMessage {
 
@@ -45,11 +48,6 @@
         this.payload = payload;
     }
 
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.ACTIVE_ENTITY_TO_CC_MESSAGE;
-    }
-
     public ActiveRuntimeId getActiveRuntimeId() {
         return activeRuntimeId;
     }
@@ -65,4 +63,14 @@
     public byte getEvent() {
         return event;
     }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        ActiveLifecycleListener.INSTANCE.receive(this);
+    }
+
+    @Override
+    public String type() {
+        return "ACTIVE_ENTITY_TO_CC_MESSAGE";
+    }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 4e1d9b0..ec2b41e 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -26,6 +26,7 @@
 import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -39,13 +40,12 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
 import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
 import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -108,7 +108,7 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         AbstractUnnestMapOperator unnestMapOp = (AbstractUnnestMapOperator) op;
         ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
         if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
@@ -258,7 +258,7 @@
                     .getBinaryTokenizerFactory(searchModifierType, searchKeyType, secondaryIndex);
             IIndexDataflowHelperFactory dataflowHelperFactory;
 
-            AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+            AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
                     .getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
             boolean temp = dataset.getDatasetDetails().isTemp();
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index bf7b975..10b601b 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -37,8 +37,8 @@
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
 /**
@@ -55,7 +55,7 @@
 
         if (!(AsterixClusterProperties.INSTANCE.getState().equals(ClusterState.ACTIVE)
                 && AsterixClusterProperties.INSTANCE.isGlobalRecoveryCompleted())) {
-            int maxWaitCycles = AsterixAppContextInfo.getInstance().getExternalProperties().getMaxWaitClusterActive();
+            int maxWaitCycles = AsterixAppContextInfo.INSTANCE.getExternalProperties().getMaxWaitClusterActive();
             int waitCycleCount = 0;
             try {
                 while (!AsterixClusterProperties.INSTANCE.getState().equals(ClusterState.ACTIVE)
@@ -84,7 +84,7 @@
         }
 
         if (!AsterixClusterProperties.INSTANCE.isGlobalRecoveryCompleted()) {
-            int maxWaitCycles = AsterixAppContextInfo.getInstance().getExternalProperties().getMaxWaitClusterActive();
+            int maxWaitCycles = AsterixAppContextInfo.INSTANCE.getExternalProperties().getMaxWaitClusterActive();
             int waitCycleCount = 0;
             try {
                 while (!AsterixClusterProperties.INSTANCE.isGlobalRecoveryCompleted()
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index ed9c1e6..56a5a57 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -93,8 +93,8 @@
 import org.apache.asterix.om.functions.AsterixFunctionInfo;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
@@ -656,7 +656,7 @@
         String outputDir = System.getProperty("java.io.tmpDir");
         String filePath = outputDir + System.getProperty("file.separator") + OUTPUT_FILE_PREFIX
                 + outputFileID.incrementAndGet();
-        AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
+        AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.INSTANCE.getMetadataProperties();
         return new FileSplit(metadataProperties.getMetadataNodeName(), new FileReference(new File(filePath)));
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index 5728947..76d0245 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -51,9 +51,9 @@
 import org.apache.asterix.lang.common.statement.FunctionDecl;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.optimizer.base.RuleCollections;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -239,13 +239,13 @@
         }
 
         //print the plot for the logical plan
-        AsterixExternalProperties xProps = AsterixAppContextInfo.getInstance().getExternalProperties();
+        AsterixExternalProperties xProps = AsterixAppContextInfo.INSTANCE.getExternalProperties();
         Boolean plot = xProps.getIsPlottingEnabled();
         if (plot) {
             PlanPlotter.printLogicalPlan(plan);
         }
 
-        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.INSTANCE.getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
         int sortFrameLimit = (int) (compilerProperties.getSortMemorySize() / frameSize);
         int groupFrameLimit = (int) (compilerProperties.getGroupMemorySize() / frameSize);
@@ -340,7 +340,7 @@
 
         JobEventListenerFactory jobEventListenerFactory =
                 new JobEventListenerFactory(asterixJobId, queryMetadataProvider.isWriteTransaction());
-        JobSpecification spec = compiler.createJob(AsterixAppContextInfo.getInstance(), jobEventListenerFactory);
+        JobSpecification spec = compiler.createJob(AsterixAppContextInfo.INSTANCE, jobEventListenerFactory);
 
         if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
             printPlanPrefix(conf, "Hyracks job");
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
index eb23902..c7cf1ea 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
@@ -26,7 +26,7 @@
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.asterix.app.result.ResultUtil;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.json.JSONException;
 import org.json.JSONObject;
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryWebInterfaceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryWebInterfaceServlet.java
index 92c8e8a..524c87f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryWebInterfaceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryWebInterfaceServlet.java
@@ -18,10 +18,6 @@
  */
 package org.apache.asterix.api.http.servlet;
 
-import org.apache.asterix.common.config.AsterixExternalProperties;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.commons.io.IOUtils;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -34,10 +30,11 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.asterix.common.config.AsterixExternalProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
 import org.codehaus.jettison.json.JSONObject;
 
 public class QueryWebInterfaceServlet extends HttpServlet {
@@ -112,7 +109,7 @@
     public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
         response.setCharacterEncoding("utf-8");
         response.setContentType("application/json");
-        AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
+        AsterixExternalProperties externalProperties = AsterixAppContextInfo.INSTANCE.getExternalProperties();
         JSONObject obj = new JSONObject();
         try {
             PrintWriter out = response.getWriter();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java
index dd2f799..c786dc7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.api.http.servlet;
 
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Date;
@@ -30,12 +32,10 @@
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.json.JSONObject;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-
 public class ShutdownAPIServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
@@ -60,7 +60,7 @@
         try {
             jsonObject.put("status", "SHUTTING_DOWN");
             jsonObject.put("date", new Date());
-            jsonObject.put("cluster" , AsterixClusterProperties.INSTANCE.getClusterStateDescription());
+            jsonObject.put("cluster", AsterixClusterProperties.INSTANCE.getClusterStateDescription());
 
             final PrintWriter writer = response.getWriter();
             writer.print(jsonObject.toString(4));
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java
index a426d50..20ecaa5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/VersionAPIServlet.java
@@ -27,7 +27,7 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.json.JSONObject;
 
 import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/AsterixResourceIdManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/AsterixResourceIdManager.java
new file mode 100644
index 0000000..a773bab
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/AsterixResourceIdManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.cc;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+
+public class AsterixResourceIdManager implements IAsterixResourceIdManager {
+
+    private final AtomicLong globalResourceId = new AtomicLong();
+    private volatile Set<String> reportedNodes = new HashSet<>();
+    private volatile boolean allReported = false;
+
+    @Override
+    public long createResourceId() {
+        if (!allReported) {
+            synchronized (this) {
+                if (!allReported) {
+                    if (reportedNodes.size() < AsterixClusterProperties.getNumberOfNodes()) {
+                        return -1;
+                    } else {
+                        reportedNodes = null;
+                        allReported = true;
+                    }
+                }
+            }
+        }
+        return globalResourceId.incrementAndGet();
+    }
+
+    @Override
+    public synchronized boolean reported(String nodeId) {
+        return allReported || reportedNodes.contains(nodeId);
+    }
+
+    @Override
+    public synchronized void report(String nodeId, long maxResourceId) {
+        if (!allReported) {
+            globalResourceId.set(Math.max(maxResourceId, globalResourceId.get()));
+            reportedNodes.add(nodeId);
+            if (reportedNodes.size() == AsterixClusterProperties.getNumberOfNodes()) {
+                reportedNodes = null;
+                allReported = true;
+            }
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
index 6922379..c8a9566 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalIndexingOperations.java
@@ -38,6 +38,7 @@
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
@@ -68,12 +69,11 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
 import org.apache.asterix.transaction.management.resource.ExternalBTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
 import org.apache.hadoop.conf.Configuration;
@@ -212,7 +212,7 @@
             ArrayList<ExternalFile> externalFilesSnapshot, AqlMetadataProvider metadataProvider, boolean createIndex)
             throws MetadataException, AlgebricksException {
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
         AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadataProvider.getMetadataTxnContext());
@@ -226,8 +226,9 @@
         ILocalResourceMetadata localResourceMetadata = new ExternalBTreeLocalResourceMetadata(
                 filesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS, filesIndexDescription.FILES_INDEX_COMP_FACTORIES,
                 new int[] { 0 }, false, dataset.getDatasetId(), mergePolicyFactory, mergePolicyFactoryProperties);
-        PersistentLocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
-                localResourceMetadata, LocalResource.ExternalBTreeResource);
+        PersistentLocalResourceFactoryProvider localResourceFactoryProvider =
+                new PersistentLocalResourceFactoryProvider(
+                        localResourceMetadata, LocalResource.ExternalBTreeResource);
         ExternalBTreeDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeDataflowHelperFactory(
                 mergePolicyFactory, mergePolicyFactoryProperties,
                 new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
@@ -257,9 +258,10 @@
      * @throws AsterixException
      * @throws Exception
      */
-    private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDataIndexingOperator(
-            AqlMetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset,
-            List<ExternalFile> files, RecordDescriptor indexerDesc) throws AsterixException {
+    private static Pair<ExternalDataScanOperatorDescriptor, AlgebricksPartitionConstraint>
+            getExternalDataIndexingOperator(
+                    AqlMetadataProvider metadataProvider, JobSpecification jobSpec, IAType itemType, Dataset dataset,
+                    List<ExternalFile> files, RecordDescriptor indexerDesc) throws AsterixException {
         ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
         Map<String, String> configuration = externalDatasetDetails.getProperties();
         IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
@@ -406,7 +408,7 @@
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
                 .splitProviderAndPartitionConstraintsForFilesIndex(dataverseName, datasetName, indexName, true);
-        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadataProvider.getMetadataTxnContext());
         IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec,
@@ -480,7 +482,7 @@
     public static JobSpecification buildCommitJob(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
             throws AlgebricksException, AsterixException {
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
         AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
                 metadataProvider.getMetadataTxnContext());
@@ -571,8 +573,10 @@
         boolean isPointMBR = spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
         int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
         int numNestedSecondaryKeyFields = numDimensions * 2;
-        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-        IBinaryComparatorFactory[] secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+        IPrimitiveValueProviderFactory[] valueProviderFactories =
+                new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        IBinaryComparatorFactory[] secondaryComparatorFactories =
+                new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
 
         ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
                 + numNestedSecondaryKeyFields];
@@ -613,7 +617,7 @@
     public static JobSpecification buildAbortOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
             throws AlgebricksException, AsterixException {
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
         AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
                 metadataProvider.getMetadataTxnContext());
@@ -671,7 +675,7 @@
     public static JobSpecification buildRecoverOp(Dataset ds, List<Index> indexes, AqlMetadataProvider metadataProvider)
             throws AlgebricksException, AsterixException {
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
         AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(ds,
                 metadataProvider.getMetadataTxnContext());
@@ -728,7 +732,7 @@
     public static JobSpecification compactFilesIndexJobSpec(Dataset dataset, AqlMetadataProvider metadataProvider)
             throws MetadataException, AlgebricksException {
         JobSpecification spec = JobSpecificationUtils.createJobSpecification();
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
         AsterixStorageProperties storageProperties = asterixPropertiesProvider.getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadataProvider.getMetadataTxnContext());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
index aab1aa3..110ace6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedOperations.java
@@ -42,7 +42,7 @@
 import org.apache.asterix.file.JobSpecificationUtils;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
index b815602..7f31d20 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java
@@ -36,7 +36,7 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.struct.Identifier;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -92,7 +92,7 @@
                     statements.add(dataverseDecl);
                     statements.add(subscribeStmt);
                     IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider);
-                    translator.compileAndExecute(AsterixAppContextInfo.getInstance().getHcc(), null,
+                    translator.compileAndExecute(AsterixAppContextInfo.INSTANCE.getHcc(), null,
                             QueryTranslator.ResultDelivery.SYNC);
                     if (LOGGER.isEnabledFor(Level.INFO)) {
                         LOGGER.info("Submitted connection requests for execution: " + request);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index 38d7da3..077600a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -51,6 +51,7 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
@@ -59,20 +60,19 @@
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataNode;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.replication.management.ReplicationChannel;
 import org.apache.asterix.replication.management.ReplicationManager;
 import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.GlobalResourceIdFactoryProvider;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.hyracks.api.application.IApplicationConfig;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
index 99235be..e151963 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.app.result;
 
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
@@ -39,7 +39,7 @@
     // Number of parallel result reader buffers
     public static final int NUM_READERS = 1;
 
-    public static final int FRAME_SIZE = AsterixAppContextInfo.getInstance().getCompilerProperties().getFrameSize();
+    public static final int FRAME_SIZE = AsterixAppContextInfo.INSTANCE.getCompilerProperties().getFrameSize();
 
     public ResultReader(IHyracksDataset hdc) {
         hyracksDataset = hdc;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 727a1f9..b430807 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -154,9 +154,9 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.optimizer.rules.IntroduceSecondaryIndexInsertDeleteRule;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
 import org.apache.asterix.translator.CompiledStatements.CompiledConnectFeedStatement;
@@ -740,9 +740,9 @@
             } else {
                 nodegroupCardinality = Integer.parseInt(dd.getHints().get(DatasetNodegroupCardinalityHint.NAME));
             }
-            List<String> nodeNames = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames();
+            List<String> nodeNames = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodeNames();
             List<String> nodeNamesClone = new ArrayList<>(nodeNames);
-            String metadataNodeName = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+            String metadataNodeName = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
             List<String> selectedNodes = new ArrayList<>();
             selectedNodes.add(metadataNodeName);
             numChosen++;
@@ -2901,7 +2901,7 @@
             if (pregelixHome == null) {
                 // Since there is a default value for PREGELIX_HOME in AsterixCompilerProperties,
                 // pregelixHome can never be null.
-                pregelixHome = AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
+                pregelixHome = AsterixAppContextInfo.INSTANCE.getCompilerProperties().getPregelixHome();
             }
 
             // Constructs the pregelix command line.
@@ -3024,7 +3024,7 @@
     protected List<String> constructPregelixCommand(RunStatement pregelixStmt, String fromDataverseName,
             String fromDatasetName, String toDataverseName, String toDatasetName) {
         // Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset.
-        AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
+        AsterixExternalProperties externalProperties = AsterixAppContextInfo.INSTANCE.getExternalProperties();
         AsterixClusterProperties clusterProperties = AsterixClusterProperties.INSTANCE;
         String clientIP = clusterProperties.getCluster().getMasterNode().getClientIp();
         StringBuilder asterixdbParameterBuilder = new StringBuilder();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
index 46b9c35..8798817 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/DatasetOperations.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -38,11 +39,10 @@
 import org.apache.asterix.metadata.entities.Dataverse;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.translator.CompiledStatements.CompiledDatasetDropStatement;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -114,7 +114,7 @@
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
                 .splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(), datasetName, datasetName,
                         temp);
-        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadataProvider.getMetadataTxnContext());
 
@@ -180,7 +180,7 @@
 
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadata.getMetadataTxnContext());
-        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
         //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
         ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
                 comparatorFactories, bloomFilterKeyFields, true, dataset.getDatasetId(), compactionInfo.first,
@@ -237,7 +237,7 @@
         int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
                 .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, datasetName, temp);
-        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(dataset,
                 metadata.getMetadataTxnContext());
         LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
index 6ed8db9..3696a36 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/IndexOperations.java
@@ -26,15 +26,15 @@
 import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledIndexCompactStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledIndexDropStatement;
@@ -103,7 +103,7 @@
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
                 .splitProviderAndPartitionConstraintsForDataset(dataverseName, datasetName, indexName, temp);
-        AsterixStorageProperties storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
+        AsterixStorageProperties storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtils.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java
index d6bfbaa..2a4ab04 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/JobSpecificationUtils.java
@@ -19,12 +19,12 @@
 package org.apache.asterix.file;
 
 import org.apache.asterix.common.config.AsterixCompilerProperties;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public class JobSpecificationUtils {
     public static JobSpecification createJobSpecification() {
-        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.INSTANCE.getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
         JobSpecification spec = new JobSpecification(frameSize);
         return spec;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
index 9e2e0b0..d257dcd 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryBTreeOperationsHelper.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
@@ -41,7 +42,6 @@
 import org.apache.asterix.transaction.management.resource.ExternalBTreeWithBuddyLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
index 8ebe246..4eced73 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryIndexOperationsHelper.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.common.context.TransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.ExternalFile;
@@ -51,15 +52,14 @@
 import org.apache.asterix.metadata.utils.DatasetUtils;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -158,7 +158,7 @@
             boolean isEnforced, int gramLength, AqlMetadataProvider metadataProvider,
             PhysicalOptimizationConfig physOptConf, ARecordType recType, ARecordType metaType,
             List<Integer> keySourceIndicators, ARecordType enforcedType) throws AsterixException, AlgebricksException {
-        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.getInstance();
+        IAsterixPropertiesProvider asterixPropertiesProvider = AsterixAppContextInfo.INSTANCE;
         SecondaryIndexOperationsHelper indexOperationsHelper = null;
         switch (indexType) {
             case BTREE: {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
index c2d2f7c..2aa6d97 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryInvertedIndexOperationsHelper.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.om.types.IAType;
@@ -35,7 +36,6 @@
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
 import org.apache.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
index cc9675d..d9a7e4e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/file/SecondaryRTreeOperationsHelper.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalDataScanOperatorDescriptor;
@@ -46,7 +47,6 @@
 import org.apache.asterix.transaction.management.resource.ExternalRTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 6a2456d..5b36782 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.hyracks.bootstrap;
 
+import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
+import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Level;
@@ -25,6 +28,7 @@
 
 import javax.servlet.Servlet;
 
+import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.api.http.servlet.APIServlet;
 import org.apache.asterix.api.http.servlet.AQLAPIServlet;
 import org.apache.asterix.api.http.servlet.ClusterAPIServlet;
@@ -39,8 +43,8 @@
 import org.apache.asterix.api.http.servlet.ShutdownAPIServlet;
 import org.apache.asterix.api.http.servlet.UpdateAPIServlet;
 import org.apache.asterix.api.http.servlet.VersionAPIServlet;
+import org.apache.asterix.app.cc.AsterixResourceIdManager;
 import org.apache.asterix.app.cc.CompilerExtensionManager;
-import org.apache.asterix.app.external.ActiveLifecycleListener;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.common.api.AsterixThreadFactory;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -56,8 +60,8 @@
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
 import org.apache.asterix.metadata.cluster.ClusterManager;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.HyracksConnection;
@@ -69,9 +73,6 @@
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
-import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_BUILD_PROP_ATTR;
-import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-
 public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
     private static final Logger LOGGER = Logger.getLogger(CCApplicationEntryPoint.class.getName());
@@ -94,11 +95,12 @@
         appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
         GlobalRecoveryManager.instantiate((HyracksConnection) getNewHyracksClientConnection());
         ILibraryManager libraryManager = new ExternalLibraryManager();
+        AsterixResourceIdManager resourceIdManager = new AsterixResourceIdManager();
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.instance(),
-                libraryManager);
+                libraryManager, resourceIdManager);
         ccExtensionManager = new CompilerExtensionManager(getExtensions());
-        AsterixAppContextInfo.getInstance().setExtensionManager(ccExtensionManager);
+        AsterixAppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
 
         if (System.getProperty("java.rmi.server.hostname") == null) {
             System.setProperty("java.rmi.server.hostname",
@@ -108,10 +110,10 @@
         setAsterixStateProxy(AsterixStateProxy.registerRemoteObject());
         appCtx.setDistributedState(proxy);
 
-        AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.getInstance().getMetadataProperties();
+        AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.INSTANCE.getMetadataProperties();
         MetadataManager.instantiate(new MetadataManager(proxy, metadataProperties));
 
-        AsterixAppContextInfo.getInstance().getCCApplicationContext()
+        AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
                 .addJobLifecycleListener(ActiveLifecycleListener.INSTANCE);
 
         servers = configureServers();
@@ -127,11 +129,11 @@
     }
 
     protected List<AsterixExtension> getExtensions() {
-        return AsterixAppContextInfo.getInstance().getExtensionProperties().getExtensions();
+        return AsterixAppContextInfo.INSTANCE.getExtensionProperties().getExtensions();
     }
 
     protected List<Server> configureServers() throws Exception {
-        AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
+        AsterixExternalProperties externalProperties = AsterixAppContextInfo.INSTANCE.getExternalProperties();
 
         List<Server> serverList = new ArrayList<>();
         serverList.add(setupWebServer(externalProperties));
@@ -190,7 +192,7 @@
 
         IHyracksClientConnection hcc = getNewHyracksClientConnection();
         context.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
-        context.setAttribute(ASTERIX_BUILD_PROP_ATTR, AsterixAppContextInfo.getInstance());
+        context.setAttribute(ASTERIX_BUILD_PROP_ATTR, AsterixAppContextInfo.INSTANCE);
 
         jsonAPIServer.setHandler(context);
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 26f4bc8..5fc91a4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -39,7 +39,7 @@
 import org.apache.asterix.metadata.cluster.ClusterManager;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
 import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
 
 public class ClusterLifecycleListener implements IClusterLifecycleListener {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 529d38a..0e3bb3b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.metadata.cluster.AddNodeWork;
 import org.apache.asterix.metadata.cluster.ClusterManager;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 
 public class ClusterWorkExecutor implements Runnable {
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index bbd400b..7dde284 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -41,7 +41,7 @@
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index a8bc48f..c11b875 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
@@ -44,8 +45,8 @@
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCApplicationContext;
@@ -210,8 +211,7 @@
     @Override
     public void notifyStartupComplete() throws Exception {
         //Send max resource id on this NC to the CC
-        ((NCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
-
+        ReportMaxResourceIdMessage.send((NodeControllerService) ncApplicationContext.getControllerService());
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
         if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 27a5365..1345aa7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -18,27 +18,13 @@
  */
 package org.apache.asterix.messaging;
 
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.app.external.ActiveLifecycleListener;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
-import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
-import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
-import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
@@ -47,10 +33,7 @@
 public class CCMessageBroker implements ICCMessageBroker {
 
     private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
-    private final AtomicLong globalResourceId = new AtomicLong(0);
     private final ClusterControllerService ccs;
-    private final Set<String> nodesReportedMaxResourceId = new HashSet<>();
-    public static final long NO_CALLBACK_MESSAGE_ID = -1;
 
     public CCMessageBroker(ClusterControllerService ccs) {
         this.ccs = ccs;
@@ -60,63 +43,9 @@
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
         AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Received message: " + absMessage.getMessageType().name());
+            LOGGER.info("Received message: " + absMessage.type());
         }
-        switch (absMessage.getMessageType()) {
-            case RESOURCE_ID_REQUEST:
-                handleResourceIdRequest(message, nodeId);
-                break;
-            case REPORT_MAX_RESOURCE_ID_RESPONSE:
-                handleReportResourceMaxIdResponse(message, nodeId);
-                break;
-            case TAKEOVER_PARTITIONS_RESPONSE:
-                handleTakeoverPartitionsResponse(message);
-                break;
-            case TAKEOVER_METADATA_NODE_RESPONSE:
-                handleTakeoverMetadataNodeResponse(message);
-                break;
-            case PREPARE_PARTITIONS_FAILBACK_RESPONSE:
-                handleClosePartitionsResponse(message);
-                break;
-            case COMPLETE_FAILBACK_RESPONSE:
-                handleCompleteFailbcakResponse(message);
-                break;
-            case ACTIVE_ENTITY_TO_CC_MESSAGE:
-                handleActiveEntityMessage(message);
-                break;
-            default:
-                LOGGER.warning("Unknown message: " + absMessage.getMessageType());
-                break;
-        }
-    }
-
-    private void handleActiveEntityMessage(IMessage message) {
-        ActiveLifecycleListener.INSTANCE.receive((ActivePartitionMessage) message);
-    }
-
-    private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception {
-        ResourceIdRequestMessage msg = (ResourceIdRequestMessage) message;
-        ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
-        reponse.setId(msg.getId());
-        //cluster is not active
-        if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
-            reponse.setResourceId(-1);
-            reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
-        } else if (nodesReportedMaxResourceId.size() < AsterixClusterProperties.getNumberOfNodes()) {
-            //some node has not reported max resource id
-            reponse.setResourceId(-1);
-            reponse.setException(new Exception("One or more nodes has not reported max resource id."));
-            requestMaxResourceID();
-        } else {
-            reponse.setResourceId(globalResourceId.incrementAndGet());
-        }
-        sendApplicationMessageToNC(reponse, nodeId);
-    }
-
-    private synchronized void handleReportResourceMaxIdResponse(IMessage message, String nodeId) throws Exception {
-        ReportMaxResourceIdMessage msg = (ReportMaxResourceIdMessage) message;
-        globalResourceId.set(Math.max(msg.getMaxResourceId(), globalResourceId.get()));
-        nodesReportedMaxResourceId.add(nodeId);
+        absMessage.handle(ccs);
     }
 
     @Override
@@ -125,36 +54,4 @@
         NodeControllerState state = nodeMap.get(nodeId);
         state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
     }
-
-    private void requestMaxResourceID() throws Exception {
-        //send request to NCs that have not reported their max resource ids
-        Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
-        ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
-        msg.setId(NO_CALLBACK_MESSAGE_ID);
-        for (String nodeId : getParticipantNodes) {
-            if (!nodesReportedMaxResourceId.contains(nodeId)) {
-                sendApplicationMessageToNC(msg, nodeId);
-            }
-        }
-    }
-
-    private void handleTakeoverPartitionsResponse(IMessage message) {
-        TakeoverPartitionsResponseMessage msg = (TakeoverPartitionsResponseMessage) message;
-        AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(msg);
-    }
-
-    private void handleTakeoverMetadataNodeResponse(IMessage message) {
-        TakeoverMetadataNodeResponseMessage msg = (TakeoverMetadataNodeResponseMessage) message;
-        AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(msg);
-    }
-
-    private void handleCompleteFailbcakResponse(IMessage message) {
-        CompleteFailbackResponseMessage msg = (CompleteFailbackResponseMessage) message;
-        AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(msg);
-    }
-
-    private void handleClosePartitionsResponse(IMessage message) {
-        PreparePartitionsFailbackResponseMessage msg = (PreparePartitionsFailbackResponseMessage) message;
-        AsterixClusterProperties.INSTANCE.processPreparePartitionsFailbackResponse(msg);
-    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index c7e4ac8..9851b61 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -27,30 +27,13 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveManager;
-import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.common.messaging.ReplicaEventMessage;
-import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.replication.IRemoteRecoveryManager;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.messages.IMessage;
@@ -112,49 +95,16 @@
 
     @Override
     public void receivedMessage(IMessage message, String nodeId) throws Exception {
-        try {
-            AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Received message: " + absMessage.getMessageType().name());
-            }
-            //if the received message is a response to a sent message, deliver it to the sender
-            IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
-            if (callback != null) {
-                callback.deliverMessageResponse(absMessage);
-            }
-
-            //handle requests from CC
-            switch (absMessage.getMessageType()) {
-                case REPORT_MAX_RESOURCE_ID_REQUEST:
-                    reportMaxResourceId();
-                    break;
-                case TAKEOVER_PARTITIONS_REQUEST:
-                    handleTakeoverPartitons(message);
-                    break;
-                case TAKEOVER_METADATA_NODE_REQUEST:
-                    handleTakeoverMetadataNode(message);
-                    break;
-                case PREPARE_PARTITIONS_FAILBACK_REQUEST:
-                    handlePreparePartitionsFailback(message);
-                    break;
-                case COMPLETE_FAILBACK_REQUEST:
-                    handleCompleteFailbackRequest(message);
-                    break;
-                case REPLICA_EVENT:
-                    handleReplicaEvent(message);
-                    break;
-                case ACTIVE_MANAGER_MESSAGE:
-                    ((ActiveManager) appContext.getActiveManager()).submit((ActiveManagerMessage) message);
-                    break;
-                default:
-                    break;
-            }
-        } catch (Exception e) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.log(Level.WARNING, e.getMessage(), e);
-            }
-            throw e;
+        AbstractApplicationMessage absMessage = (AbstractApplicationMessage) message;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Received message: " + absMessage.type());
         }
+        //if the received message is a response to a sent message, deliver it to the sender
+        IApplicationMessageCallback callback = callbacks.remove(absMessage.getId());
+        if (callback != null) {
+            callback.deliverMessageResponse(absMessage);
+        }
+        absMessage.handle(ncs);
     }
 
     public ConcurrentFramePool getMessagingFramePool() {
@@ -190,96 +140,6 @@
         ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
     }
 
-    private void handleTakeoverPartitons(IMessage message) throws Exception {
-        TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
-        //if the NC is shutting down, it should ignore takeover partitions request
-        if (!appContext.isShuttingdown()) {
-            try {
-                IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
-                remoteRecoeryManager.takeoverPartitons(msg.getPartitions());
-            } finally {
-                //send response after takeover is completed
-                TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
-                        appContext.getTransactionSubsystem().getId(), msg.getPartitions());
-                sendMessageToCC(reponse, null);
-            }
-        }
-    }
-
-    private void handleTakeoverMetadataNode(IMessage message) throws Exception {
-        try {
-            appContext.initializeMetadata(false);
-            appContext.exportMetadataNodeStub();
-        } finally {
-            TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
-                    appContext.getTransactionSubsystem().getId());
-            sendMessageToCC(reponse, null);
-        }
-    }
-
-    public void reportMaxResourceId() throws Exception {
-        ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
-        //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
-        long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
-                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
-        maxResourceIdMsg.setMaxResourceId(maxResourceId);
-        sendMessageToCC(maxResourceIdMsg, null);
-    }
-
-    private void handleReplicaEvent(IMessage message) {
-        ReplicaEventMessage msg = (ReplicaEventMessage) message;
-        Node node = new Node();
-        node.setId(msg.getNodeId());
-        node.setClusterIp(msg.getNodeIPAddress());
-        Replica replica = new Replica(node);
-        ReplicaEvent event = new ReplicaEvent(replica, msg.getEvent());
-        appContext.getReplicationManager().reportReplicaEvent(event);
-    }
-
-    private void handlePreparePartitionsFailback(IMessage message) throws Exception {
-        PreparePartitionsFailbackRequestMessage msg = (PreparePartitionsFailbackRequestMessage) message;
-        /**
-         * if the metadata partition will be failed back
-         * we need to flush and close all datasets including metadata datasets
-         * otherwise we need to close all non-metadata datasets and flush metadata datasets
-         * so that their memory components will be copied to the failing back node
-         */
-        if (msg.isReleaseMetadataNode()) {
-            appContext.getDatasetLifecycleManager().closeAllDatasets();
-            //remove the metadata node stub from RMI registry
-            appContext.unexportMetadataNodeStub();
-        } else {
-            //close all non-metadata datasets
-            appContext.getDatasetLifecycleManager().closeUserDatasets();
-            //flush the remaining metadata datasets that were not closed
-            appContext.getDatasetLifecycleManager().flushAllDatasets();
-        }
-
-        //mark the partitions to be closed as inactive
-        PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
-                .getLocalResourceRepository();
-        for (Integer partitionId : msg.getPartitions()) {
-            localResourceRepo.addInactivePartition(partitionId);
-        }
-
-        //send response after partitions prepared for failback
-        PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
-                msg.getRequestId(), msg.getPartitions());
-        sendMessageToCC(reponse, null);
-    }
-
-    private void handleCompleteFailbackRequest(IMessage message) throws Exception {
-        CompleteFailbackRequestMessage msg = (CompleteFailbackRequestMessage) message;
-        try {
-            IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
-            remoteRecoeryManager.completeFailbackProcess();
-        } finally {
-            CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
-                    msg.getRequestId(), msg.getPartitions());
-            sendMessageToCC(reponse, null);
-        }
-    }
-
     private class MessageDeliveryService implements Runnable {
         /*
          * TODO Currently this thread is not stopped when it is interrupted because
@@ -302,7 +162,7 @@
                 } catch (Exception e) {
                     if (LOGGER.isLoggable(Level.WARNING) && msg != null) {
                         LOGGER.log(Level.WARNING, "Could not process message with id: " + msg.getId() + " and type: "
-                                + msg.getMessageType().name(), e);
+                                + msg.type(), e);
                     } else {
                         if (LOGGER.isLoggable(Level.WARNING)) {
                             LOGGER.log(Level.WARNING, "Could not process message", e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
index 7536c70..5f99dbf 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FlushDatasetUtils.java
@@ -23,9 +23,9 @@
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.std.FlushDatasetOperatorDescriptor;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
@@ -44,8 +44,8 @@
 
     public static void flushDataset(IHyracksClientConnection hcc, AqlMetadataProvider metadataProvider,
             MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, String indexName)
-                    throws Exception {
-        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
+            throws Exception {
+        AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.INSTANCE.getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
         JobSpecification spec = new JobSpecification(frameSize);
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
index 015088a..1c190c9 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionAPIServletTest.java
@@ -32,7 +32,7 @@
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.asterix.common.config.AsterixBuildProperties;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.test.runtime.ExecutionTest;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.json.JSONObject;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 7e35f11..a548b3a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -51,7 +52,6 @@
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index 3591509..4e8875b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -36,8 +36,8 @@
 import org.apache.asterix.event.schema.cluster.MasterNode;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.statement.RunStatement;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index c73be7a..39a4d3b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -31,7 +31,7 @@
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.testframework.xml.TestGroup;
 import org.apache.asterix.testframework.xml.TestSuite;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -84,7 +84,7 @@
 
         List<ILibraryManager> libraryManagers = new ArrayList<>();
         // Adds the library manager for CC.
-        libraryManagers.add(AsterixAppContextInfo.getInstance().getLibraryManager());
+        libraryManagers.add(AsterixAppContextInfo.INSTANCE.getLibraryManager());
         // Adds library managers for NCs, one-per-NC.
         for (NodeControllerService nc : integrationUtil.ncs) {
             IAsterixAppRuntimeContext runtimeCtx =
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index bfdc834..c01499e 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -263,6 +263,11 @@
       <version>2.0.2-beta</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>18.0</version>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
similarity index 81%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index d986648..b9d187d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataExceptionUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -16,31 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.util;
-
-import java.util.Arrays;
+package org.apache.asterix.common.exceptions;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class ExternalDataExceptionUtils {
+public class ExceptionUtils {
     public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
     public static final String MISSING_PARAMETER = "Missing parameter.\n";
     public static final String PARAMETER_NAME = "Parameter name: ";
     public static final String EXPECTED_VALUE = "Expected value: ";
     public static final String PASSED_VALUE = "Passed value: ";
 
+    private ExceptionUtils() {
+    }
+
     public static String incorrectParameterMessage(String parameterName, String expectedValue, String passedValue) {
-        return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + ExternalDataConstants.LF + EXPECTED_VALUE
-                + expectedValue + ExternalDataConstants.LF + PASSED_VALUE + passedValue;
-    }
-
-    public static String concat(String... vals) {
-        return Arrays.toString(vals);
-    }
-
-    // For now, we are accepting all exceptions as resolvable by adapter.
-    public static boolean isResolvable(Exception e) {
-        return true;
+        return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + System.lineSeparator() + EXPECTED_VALUE
+                + expectedValue + System.lineSeparator() + PASSED_VALUE + passedValue;
     }
 
     public static HyracksDataException suppressIntoHyracksDataException(HyracksDataException hde, Throwable th) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
index fbb9b86..5737957 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractApplicationMessage.java
@@ -22,7 +22,7 @@
 
 public abstract class AbstractApplicationMessage implements IApplicationMessage {
     private static final long serialVersionUID = 1L;
-    private long id;
+    protected long id;
 
     @Override
     public void setId(long id) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
deleted file mode 100644
index 6518ae5..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackRequestMessage.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import java.util.Set;
-
-public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-    private final String nodeId;
-
-    public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.nodeId = nodeId;
-        this.partitions = partitions;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.COMPLETE_FAILBACK_REQUEST;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
-        sb.append(" Node ID: " + nodeId);
-        sb.append(" Partitions: " + partitions);
-        return sb.toString();
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
deleted file mode 100644
index 02fe917..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import java.util.Set;
-
-public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
-    private boolean releaseMetadataNode = false;
-    private final String nodeID;
-
-    public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
-        super(planId, requestId);
-        this.nodeID = nodeId;
-        this.partitions = partitions;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_REQUEST;
-    }
-
-    public Set<Integer> getPartitions() {
-        return partitions;
-    }
-
-    public boolean isReleaseMetadataNode() {
-        return releaseMetadataNode;
-    }
-
-    public void setReleaseMetadataNode(boolean releaseMetadataNode) {
-        this.releaseMetadataNode = releaseMetadataNode;
-    }
-
-    public String getNodeID() {
-        return nodeID;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Plan ID: " + planId);
-        sb.append(" Partitions: " + partitions);
-        sb.append(" releaseMetadataNode: " + releaseMetadataNode);
-        return sb.toString();
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
deleted file mode 100644
index bf0219c..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
-
-public class ReplicaEventMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final String nodeId;
-    private final ClusterEventType event;
-    private final String nodeIPAddress;
-
-    public ReplicaEventMessage(String nodeId, String nodeIPAddress, ClusterEventType event) {
-        this.nodeId = nodeId;
-        this.nodeIPAddress = nodeIPAddress;
-        this.event = event;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.REPLICA_EVENT;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public ClusterEventType getEvent() {
-        return event;
-    }
-
-    public String getNodeIPAddress() {
-        return nodeIPAddress;
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
deleted file mode 100644
index a2b94a7..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
-    private static final long serialVersionUID = 1L;
-    public long maxResourceId;
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_RESPONSE;
-    }
-
-    public long getMaxResourceId() {
-        return maxResourceId;
-    }
-
-    public void setMaxResourceId(long maxResourceId) {
-        this.maxResourceId = maxResourceId;
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
deleted file mode 100644
index f7a276b..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
deleted file mode 100644
index 5d07d5d..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.messaging;
-
-public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
-
-    private static final long serialVersionUID = 1L;
-    private final Integer[] partitions;
-    private final long requestId;
-    private final String nodeId;
-
-    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
-        this.requestId = requestId;
-        this.nodeId = nodeId;
-        this.partitions = partitionsToTakeover;
-    }
-
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
-    }
-
-    public Integer[] getPartitions() {
-        return partitions;
-    }
-
-    public long getRequestId() {
-        return requestId;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Request ID: " + requestId);
-        sb.append(" Node ID: " + nodeId);
-        sb.append(" Partitions: ");
-        for (Integer partitionId : partitions) {
-            sb.append(partitionId + ",");
-        }
-        //remove last comma
-        sb.charAt(sb.length() - 1);
-        return sb.toString();
-    }
-}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 5f08dd8..b93082d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -18,30 +18,12 @@
  */
 package org.apache.asterix.common.messaging.api;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.messages.IMessage;
+import org.apache.hyracks.api.service.IControllerService;
 
 public interface IApplicationMessage extends IMessage {
 
-    public enum ApplicationMessageType {
-        RESOURCE_ID_REQUEST,
-        RESOURCE_ID_RESPONSE,
-        REPORT_MAX_RESOURCE_ID_REQUEST,
-        REPORT_MAX_RESOURCE_ID_RESPONSE,
-        TAKEOVER_PARTITIONS_REQUEST,
-        TAKEOVER_PARTITIONS_RESPONSE,
-        TAKEOVER_METADATA_NODE_REQUEST,
-        TAKEOVER_METADATA_NODE_RESPONSE,
-        PREPARE_PARTITIONS_FAILBACK_REQUEST,
-        PREPARE_PARTITIONS_FAILBACK_RESPONSE,
-        COMPLETE_FAILBACK_REQUEST,
-        COMPLETE_FAILBACK_RESPONSE,
-        REPLICA_EVENT,
-        ACTIVE_ENTITY_TO_CC_MESSAGE,
-        ACTIVE_MANAGER_MESSAGE
-    }
-
-    public abstract ApplicationMessageType getMessageType();
-
     /**
      * Sets a unique message id that identifies this message within an NC.
      * This id is set by {@link INCMessageBroker#sendMessageToCC(IApplicationMessage, IApplicationMessageCallback)}
@@ -55,4 +37,16 @@
      * @return The unique message id if it has been set, otherwise 0.
      */
     public long getId();
+
+    /**
+     * handle the message upon delivery
+     */
+    public void handle(IControllerService cs) throws HyracksDataException;
+
+    /**
+     * get a string representation for the message type
+     *
+     * @return
+     */
+    public String type();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 7dafbd5..2290b93 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -21,9 +21,11 @@
 import org.apache.hyracks.api.messages.IMessageBroker;
 
 public interface ICCMessageBroker extends IMessageBroker {
+    public static final long NO_CALLBACK_MESSAGE_ID = -1;
 
     /**
      * Sends the passed message to the specified {@code nodeId}
+     *
      * @param msg
      * @param nodeId
      * @throws Exception
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
similarity index 91%
rename from asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
index 53a3cbd..74589cc 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndexImmutableProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/MetadataIndexImmutableProperties.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.metadata.bootstrap;
+package org.apache.asterix.common.metadata;
 
 public class MetadataIndexImmutableProperties {
     private final String indexName;
@@ -45,7 +45,7 @@
         return datasetId;
     }
 
-    // Right now, we only have primary indexes. Hence, dataset name is always index name
+    // Right now, we only have primary Metadata indexes. Hence, dataset name is always index name
     public String getDatasetName() {
         return indexName;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java
similarity index 72%
copy from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
copy to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java
index daeb9c4..06038cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAsterixResourceIdManager.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.common.transactions;
 
-public class ResourceIdRequestMessage extends AbstractApplicationMessage {
-    private static final long serialVersionUID = 1L;
+public interface IAsterixResourceIdManager {
 
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.RESOURCE_ID_REQUEST;
-    }
+    long createResourceId();
+
+    boolean reported(String nodeId);
+
+    void report(String nodeId, long maxResourceId);
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index b49a719..e4f21a6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -23,8 +23,8 @@
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 
 public interface IExternalDataSourceFactory extends Serializable {
@@ -73,11 +73,11 @@
             AlgebricksAbsolutePartitionConstraint constraints, int count) {
         if (constraints == null) {
             ArrayList<String> locs = new ArrayList<String>();
-            Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
+            Map<String, String[]> stores = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getStores();
             int i = 0;
             while (i < count) {
                 for (String node : stores.keySet()) {
-                    int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(node);
+                    int numIODevices = AsterixClusterProperties.INSTANCE.getIODevices(node).length;
                     for (int k = 0; k < numIODevices; k++) {
                         locs.add(node);
                         i++;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index cf4ed19..87ee167 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import javax.annotation.Nonnull;
-
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -34,7 +32,7 @@
     protected final FeedLogManager feedLogManager;
 
     public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            @Nonnull FeedLogManager feedLogManager, int numOfFields) {
+            FeedLogManager feedLogManager, int numOfFields) {
         this.feedLogManager = feedLogManager;
         this.numOfFields = numOfFields;
         this.ctx = ctx;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 10e9125..859a011 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -25,14 +25,12 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.annotation.Nonnull;
-
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.IFeedMarker;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
@@ -57,8 +55,8 @@
     private Future<?> dataflowMarkerResult;
 
     public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
-            @Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
-            @Nonnull IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
+            FeedLogManager feedLogManager, int numOfOutputFields, IRecordDataParser<T> dataParser,
+            IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
@@ -101,13 +99,13 @@
         try {
             tupleForwarder.close();
         } catch (Throwable th) {
-            hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
+            hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
         }
         try {
             recordReader.close();
         } catch (Throwable th) {
             LOGGER.warn("Failure during while operating a feed sourcec", th);
-            hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
+            hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
         } finally {
             closeSignal();
             if (sendMarker && dataflowMarkerResult != null) {
@@ -182,12 +180,12 @@
                 try {
                     tupleForwarder.close();
                 } catch (Throwable th) {
-                    hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
+                    hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
                 }
                 try {
                     recordReader.close();
                 } catch (Throwable th) {
-                    hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
+                    hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
                 }
                 if (hde != null) {
                     throw hde;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index 36c6c2f..b8d0532 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -20,8 +20,6 @@
 
 import java.io.IOException;
 
-import javax.annotation.Nonnull;
-
 import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.external.util.FeedLogManager;
@@ -46,7 +44,7 @@
     private boolean paused = false;
     private boolean initialized;
 
-    public FeedTupleForwarder(@Nonnull FeedLogManager feedLogManager) {
+    public FeedTupleForwarder(FeedLogManager feedLogManager) {
         this.feedLogManager = feedLogManager;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
index 9c8563d..2b06775 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/IndexingDataFlowController.java
@@ -20,8 +20,6 @@
 
 import java.io.IOException;
 
-import javax.annotation.Nonnull;
-
 import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
@@ -34,7 +32,7 @@
     private final IExternalIndexer indexer;
 
     public IndexingDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
-            @Nonnull IRecordDataParser<T> dataParser, @Nonnull IRecordReader<? extends T> recordReader,
+            IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader,
             IExternalIndexer indexer) throws IOException {
         super(ctx, tupleForwarder, dataParser, recordReader, 1 + indexer.getNumberOfFields());
         this.indexer = indexer;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
index 915c343..e780834 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RecordDataFlowController.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import javax.annotation.Nonnull;
-
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
@@ -38,7 +36,7 @@
     protected final int numOfTupleFields;
 
     public RecordDataFlowController(IHyracksTaskContext ctx, ITupleForwarder tupleForwarder,
-            @Nonnull IRecordDataParser<T> dataParser, @Nonnull IRecordReader<? extends T> recordReader,
+            IRecordDataParser<T> dataParser, IRecordReader<? extends T> recordReader,
             int numOfTupleFields) {
         super(ctx, tupleForwarder);
         this.dataParser = dataParser;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 7f2191c..0c49c92 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -43,7 +43,7 @@
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
 import org.apache.asterix.external.util.FeedUtils.JobType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
@@ -202,7 +202,7 @@
             }
         }
 
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
         JobInfo info = hcc.getJobInfo(intakeJobInfo.getJobId());
         List<String> intakeLocations = new ArrayList<>();
         for (OperatorDescriptorId intakeOperatorId : intakeOperatorIds) {
@@ -349,7 +349,7 @@
 
     private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, ActiveEvent message)
             throws Exception {
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
         JobInfo info = hcc.getJobInfo(message.getJobId());
         JobStatus status = info.getStatus();
         EntityId feedId = intakeInfo.getFeedId();
@@ -369,7 +369,7 @@
     private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
         FeedConnectionId connectionId = cInfo.getConnectionId();
 
-        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+        IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
         JobInfo info = hcc.getJobInfo(cInfo.getJobId());
         JobStatus status = info.getStatus();
         boolean failure = status != null && status.equals(JobStatus.FAILURE);
@@ -523,7 +523,7 @@
         }
 
         try {
-            IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
+            IHyracksClientConnection hcc = AsterixAppContextInfo.INSTANCE.getHcc();
             JobInfo info = hcc.getJobInfo(cInfo.getJobId());
             List<String> collectLocations = new ArrayList<>();
             for (OperatorDescriptorId collectOpId : collectOperatorIds) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index 6c04b2d..2adce1c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.external.feed.runtime;
 
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.log4j.Logger;
 
@@ -58,12 +57,7 @@
                 continueIngestion = false;
             } catch (Exception e) {
                 LOGGER.error("Exception during feed ingestion ", e);
-                // Check if the adapter wants to continue ingestion
-                if (ExternalDataExceptionUtils.isResolvable(e)) {
-                    continueIngestion = adapter.handleException(e);
-                } else {
-                    continueIngestion = false;
-                }
+                continueIngestion = adapter.handleException(e);
                 failedIngestion = !continueIngestion;
             }
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
index 88964a1..006bf0b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/QuotedLineRecordReader.java
@@ -20,10 +20,9 @@
 
 import java.io.IOException;
 
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class QuotedLineRecordReader extends LineRecordReader {
@@ -36,7 +35,7 @@
             throws HyracksDataException {
         super(hasHeader, stream);
         if ((quoteString == null) || (quoteString.length() != 1)) {
-            throw new HyracksDataException(ExternalDataExceptionUtils.incorrectParameterMessage(
+            throw new HyracksDataException(ExceptionUtils.incorrectParameterMessage(
                     ExternalDataConstants.KEY_QUOTE, ExternalDataConstants.PARAMETER_OF_SIZE_ONE, quoteString));
         }
         this.quote = quoteString.charAt(0);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index 26ac3cb..4d6d004 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -20,10 +20,9 @@
 
 import java.io.IOException;
 
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IExternalIndexer;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class SemiStructuredRecordReader extends StreamRecordReader {
@@ -42,7 +41,7 @@
         if (recStartString != null) {
             if (recStartString.length() != 1) {
                 throw new HyracksDataException(
-                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
+                        ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_START,
                                 ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recStartString));
             }
             recordStart = recStartString.charAt(0);
@@ -53,7 +52,7 @@
         if (recEndString != null) {
             if (recEndString.length() != 1) {
                 throw new HyracksDataException(
-                        ExternalDataExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
+                        ExceptionUtils.incorrectParameterMessage(ExternalDataConstants.KEY_RECORD_END,
                                 ExternalDataConstants.PARAMETER_OF_SIZE_ONE, recEndString));
             }
             recordEnd = recEndString.charAt(0);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index 26f8654..7995091 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -23,8 +23,8 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.util.ExternalDataExceptionUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.log4j.Logger;
 
@@ -116,14 +116,14 @@
             }
             socket = null;
         } catch (IOException e) {
-            hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
         }
         try {
             if (server != null) {
                 server.close();
             }
         } catch (IOException e) {
-            hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
         } finally {
             server = null;
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
index f63b895..aafee8e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/SocketServerInputStreamFactory.java
@@ -32,7 +32,7 @@
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.stream.SocketServerInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
+import org.apache.asterix.runtime.util.RuntimeUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -64,8 +64,8 @@
                         "\'sockets\' parameter not specified as part of adapter configuration");
             }
             Map<InetAddress, Set<String>> ncMap;
-            ncMap = AsterixRuntimeUtil.getNodeControllerMap();
-            List<String> ncs = AsterixRuntimeUtil.getAllNodeControllers();
+            ncMap = RuntimeUtils.getNodeControllerMap();
+            List<String> ncs = RuntimeUtils.getAllNodeControllers();
             String[] socketsArray = socketsValue.split(",");
             Random random = new Random();
             for (String socket : socketsArray) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index 8052822..5575d2c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -26,7 +26,7 @@
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
index b0594d2..789e9ba 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalFunction.java
@@ -31,7 +31,7 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -70,7 +70,7 @@
         ILibraryManager libraryManager;
         if (context == null) {
             // Gets the library manager for compile-time constant folding.
-            libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
+            libraryManager = AsterixAppContextInfo.INSTANCE.getLibraryManager();
         } else {
             // Gets the library manager for real runtime evaluation.
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) context.getJobletContext()
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index e98e744..ebe3276 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -20,8 +20,6 @@
 
 import java.util.Map;
 
-import javax.annotation.Nonnull;
-
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.api.IDataParserFactory;
@@ -57,7 +55,7 @@
     }
 
     @SuppressWarnings("rawtypes")
-    public static IDataParserFactory getDataParserFactory(@Nonnull String parser) throws AsterixException {
+    public static IDataParserFactory getDataParserFactory(String parser) throws AsterixException {
         switch (parser) {
             case ExternalDataConstants.FORMAT_ADM:
             case ExternalDataConstants.FORMAT_JSON:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 9c378a0..44980c8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -27,7 +27,7 @@
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 936ada5..0848532 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -29,8 +29,8 @@
 import org.apache.asterix.external.indexing.IndexingScheduler;
 import org.apache.asterix.external.indexing.RecordId.RecordIdType;
 import org.apache.asterix.external.input.stream.HDFSInputStream;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,7 +49,7 @@
 public class HDFSUtils {
 
     public static Scheduler initializeHDFSScheduler() {
-        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+        ICCContext ccContext = AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext();
         Scheduler scheduler = null;
         try {
             scheduler = new Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
@@ -61,7 +61,7 @@
     }
 
     public static IndexingScheduler initializeIndexingHDFSScheduler() {
-        ICCContext ccContext = AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext();
+        ICCContext ccContext = AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext();
         IndexingScheduler scheduler = null;
         try {
             scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
@@ -80,6 +80,7 @@
      * 1. NoOp means appended file
      * 2. AddOp means new file
      * 3. UpdateOp means the delta of a file
+     *
      * @return
      * @throws IOException
      */
@@ -109,7 +110,7 @@
                                 .add(new FileSplit(filePath,
                                         block.getOffset(), (block.getLength() + block.getOffset()) < file.getSize()
                                                 ? block.getLength() : (file.getSize() - block.getOffset()),
-                                block.getHosts()));
+                                        block.getHosts()));
                         orderedExternalFiles.add(file);
                     }
                 }
@@ -201,17 +202,17 @@
     public static AlgebricksAbsolutePartitionConstraint getPartitionConstraints(
             AlgebricksAbsolutePartitionConstraint clusterLocations) {
         if (clusterLocations == null) {
-            ArrayList<String> locs = new ArrayList<String>();
-            Map<String, String[]> stores = AsterixAppContextInfo.getInstance().getMetadataProperties().getStores();
-            for (String i : stores.keySet()) {
-                int numIODevices = AsterixClusterProperties.INSTANCE.getNumberOfIODevices(i);
+            ArrayList<String> locs = new ArrayList<>();
+            Map<String, String[]> stores = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getStores();
+            for (String node : stores.keySet()) {
+                int numIODevices = AsterixClusterProperties.INSTANCE.getIODevices(node).length;
                 for (int k = 0; k < numIODevices; k++) {
-                    locs.add(i);
+                    locs.add(node);
                 }
             }
             String[] cluster = new String[locs.size()];
             cluster = locs.toArray(cluster);
-            clusterLocations = new AlgebricksAbsolutePartitionConstraint(cluster);
+            return new AlgebricksAbsolutePartitionConstraint(cluster);
         }
         return clusterLocations;
     }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
index 61764d7..52c2850 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
@@ -29,7 +29,7 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.INodeResolver;
-import org.apache.asterix.om.util.AsterixRuntimeUtil;
+import org.apache.asterix.runtime.util.RuntimeUtils;
 
 /**
  * Resolves a value (DNS/IP Address) or a (Node Controller Id) to the id of a Node Controller running at the location.
@@ -86,7 +86,7 @@
     private static void updateNCs() throws Exception {
         synchronized (ncMap) {
             ncMap.clear();
-            AsterixRuntimeUtil.getNodeControllerMap(ncMap);
+            RuntimeUtils.getNodeControllerMap(ncMap);
             synchronized (ncs) {
                 ncs.clear();
                 for (Entry<InetAddress, Set<String>> entry : ncMap.entrySet()) {
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index f5afef6..61f5021 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -22,7 +22,7 @@
 
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 182cbda..c929b41 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
@@ -52,7 +53,6 @@
 import org.apache.asterix.metadata.api.IMetadataIndex;
 import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.api.IValueExtractor;
-import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
 import org.apache.asterix.metadata.entities.Dataset;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/ExtensionMetadataDataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/ExtensionMetadataDataset.java
index d3f7e1c..d25f488 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/ExtensionMetadataDataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/ExtensionMetadataDataset.java
@@ -20,8 +20,8 @@
 
 import java.util.List;
 
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.metadata.bootstrap.MetadataIndex;
-import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 616c724..2629fea 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -67,8 +67,8 @@
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
index 6c8bebd..db6145a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
@@ -24,6 +24,7 @@
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlBinaryHashFunctionFactoryProvider;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
index 8201627..833f3e5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataPrimaryIndexes.java
@@ -21,6 +21,7 @@
 
 import java.util.Arrays;
 
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.metadata.api.IMetadataIndex;
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.BuiltinType;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index e7bf3bf..f076152 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -45,8 +45,8 @@
 import org.apache.asterix.event.util.PatternCreator;
 import org.apache.asterix.installer.schema.conf.Configuration;
 import org.apache.asterix.metadata.api.IClusterManager;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 
 public class ClusterManager implements IClusterManager {
 
@@ -101,13 +101,13 @@
         try {
             Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
             List<Pattern> pattern = new ArrayList<Pattern>();
-            String asterixInstanceName = AsterixAppContextInfo.getInstance().getMetadataProperties().getInstanceName();
+            String asterixInstanceName = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getInstanceName();
             Patterns prepareNode = PatternCreator.INSTANCE.createPrepareNodePattern(asterixInstanceName,
                     AsterixClusterProperties.INSTANCE.getCluster(), node);
             cluster.getNode().add(node);
             client.submit(prepareNode);
 
-            AsterixExternalProperties externalProps = AsterixAppContextInfo.getInstance().getExternalProperties();
+            AsterixExternalProperties externalProps = AsterixAppContextInfo.INSTANCE.getExternalProperties();
             AsterixEventServiceUtil.poulateClusterEnvironmentProperties(cluster, externalProps.getCCJavaParams(),
                     externalProps.getNCJavaParams());
 
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
index 9e65d62..b5e6449 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
@@ -21,7 +21,7 @@
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.asterix.om.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
 /**
@@ -113,7 +113,7 @@
                 if (intValue < 0) {
                     return new Pair<Boolean, String>(false, "Value must be >= 0");
                 }
-                int numNodesInCluster = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames()
+                int numNodesInCluster = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodeNames()
                         .size();
                 if (numNodesInCluster < intValue) {
                     return new Pair<Boolean, String>(false,
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 0975163..d8ac7b9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -85,14 +85,15 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixAppContextInfo;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.om.util.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.AsterixLSMInvertedIndexUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.operators.AsterixLSMTreeUpsertOperatorDescriptor;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
@@ -103,7 +104,6 @@
 import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -187,8 +187,8 @@
 
     public AqlMetadataProvider(Dataverse defaultDataverse) {
         this.defaultDataverse = defaultDataverse;
-        this.storageProperties = AsterixAppContextInfo.getInstance().getStorageProperties();
-        this.libraryManager = AsterixAppContextInfo.getInstance().getLibraryManager();
+        this.storageProperties = AsterixAppContextInfo.INSTANCE.getStorageProperties();
+        this.libraryManager = AsterixAppContextInfo.INSTANCE.getLibraryManager();
     }
 
     public String getPropertyValue(String propertyName) {
@@ -489,10 +489,11 @@
                 }
                 Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits =
                         getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex(
-                        secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
-                        DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
-                        dataset.hasMetaPart(), primaryKeyIndicators, secondaryIndex.getKeyFieldSourceIndicators(),
-                        metaType);
+                                secondaryIndex.getKeyFieldNames(), secondaryIndex.getKeyFieldTypes(),
+                                DatasetUtils.getPartitioningKeys(dataset), itemType, dataset.getDatasetType(),
+                                dataset.hasMetaPart(), primaryKeyIndicators,
+                                secondaryIndex.getKeyFieldSourceIndicators(),
+                                metaType);
                 comparatorFactories = comparatorFactoriesAndTypeTraits.first;
                 typeTraits = comparatorFactoriesAndTypeTraits.second;
                 if (filterTypeTraits != null) {
@@ -569,12 +570,12 @@
                 int[] buddyBreeFields = new int[] { numSecondaryKeys };
                 ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory =
                         new ExternalBTreeWithBuddyDataflowHelperFactory(
-                        compactionInfo.first, compactionInfo.second,
-                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
-                        getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
-                        ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
+                                compactionInfo.first, compactionInfo.second,
+                                new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE,
+                                getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields,
+                                ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
                 btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
                         rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
                         highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
@@ -2245,4 +2246,4 @@
             throw new AlgebricksException("Only record types can be indexed.");
         }
     }
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index d74bf9f..5853a39 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -32,8 +32,8 @@
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
index 0b22dab..3dfa8a5 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -36,7 +36,7 @@
 import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index a31f2a6..85e4405 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -28,7 +28,7 @@
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index d247490..5a88a9f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -59,6 +59,7 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -69,7 +70,6 @@
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 5ba6ad2..53bd164 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -65,6 +65,7 @@
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
@@ -75,7 +76,6 @@
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 4da5fd4..46a2076 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -38,9 +38,9 @@
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.replication.storage.ReplicaResourcesManager;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 
 public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 41fc0b8..01d6db7 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -40,10 +40,10 @@
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
similarity index 91%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
index 4aba0c2..2eb1be9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/AbstractFailbackPlanMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/AbstractFailbackPlanMessage.java
@@ -16,7 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 
 public abstract class AbstractFailbackPlanMessage extends AbstractApplicationMessage {
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
new file mode 100644
index 0000000..c96bcb8
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(CompleteFailbackRequestMessage.class.getName());
+    private final Set<Integer> partitions;
+    private final String nodeId;
+
+    public CompleteFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.nodeId = nodeId;
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Partitions: " + partitions);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        HyracksDataException hde = null;
+        try {
+            IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+            remoteRecoeryManager.completeFailbackProcess();
+        } catch (IOException | InterruptedException e) {
+            LOGGER.log(Level.SEVERE, "Failure during completion of failback process", e);
+            hde = ExceptionUtils.convertToHyracksDataException(e);
+        } finally {
+            CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId,
+                    requestId, partitions);
+            try {
+                broker.sendMessageToCC(reponse, null);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
+                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            }
+        }
+        if (hde != null) {
+            throw hde;
+        }
+    }
+
+    @Override
+    public String type() {
+        return "COMPLETE_FAILBACK_REQUEST";
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
similarity index 75%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
index a036ff0..3c4b58c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CompleteFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -16,10 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
 
 import java.util.Set;
 
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
 public class CompleteFailbackResponseMessage extends AbstractFailbackPlanMessage {
 
     private static final long serialVersionUID = 1L;
@@ -30,11 +34,6 @@
         this.partitions = partitions;
     }
 
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.COMPLETE_FAILBACK_RESPONSE;
-    }
-
     public Set<Integer> getPartitions() {
         return partitions;
     }
@@ -46,4 +45,14 @@
         sb.append(" Partitions: " + partitions);
         return sb.toString();
     }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        AsterixClusterProperties.INSTANCE.processCompleteFailbackResponse(this);
+    }
+
+    @Override
+    public String type() {
+        return "COMPLETE_FAILBACK_RESPONSE";
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
similarity index 96%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
index 0591644..24f5fe8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/NodeFailbackPlan.java
@@ -16,16 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.replication;
+package org.apache.asterix.runtime.message;
 
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
-
 public class NodeFailbackPlan {
 
     public enum FailbackPlanState {
@@ -107,7 +104,7 @@
                     }
                 }
 
-                if (failedRequests.size() > 0) {
+                if (!failedRequests.isEmpty()) {
                     state = FailbackPlanState.FAILED;
                     for (Integer failedRequestId : failedRequests) {
                         markRequestCompleted(failedRequestId);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
new file mode 100644
index 0000000..e3b9fbe
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.rmi.RemoteException;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(PreparePartitionsFailbackRequestMessage.class.getName());
+    private final Set<Integer> partitions;
+    private boolean releaseMetadataNode = false;
+    private final String nodeID;
+
+    public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.nodeID = nodeId;
+        this.partitions = partitions;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public boolean isReleaseMetadataNode() {
+        return releaseMetadataNode;
+    }
+
+    public void setReleaseMetadataNode(boolean releaseMetadataNode) {
+        this.releaseMetadataNode = releaseMetadataNode;
+    }
+
+    public String getNodeID() {
+        return nodeID;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Partitions: " + partitions);
+        sb.append(" releaseMetadataNode: " + releaseMetadataNode);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        /**
+         * if the metadata partition will be failed back
+         * we need to flush and close all datasets including metadata datasets
+         * otherwise we need to close all non-metadata datasets and flush metadata datasets
+         * so that their memory components will be copied to the failing back node
+         */
+        if (releaseMetadataNode) {
+            appContext.getDatasetLifecycleManager().closeAllDatasets();
+            //remove the metadata node stub from RMI registry
+            try {
+                appContext.unexportMetadataNodeStub();
+            } catch (RemoteException e) {
+                LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", e);
+                throw ExceptionUtils.convertToHyracksDataException(e);
+            }
+        } else {
+            //close all non-metadata datasets
+            appContext.getDatasetLifecycleManager().closeUserDatasets();
+            //flush the remaining metadata datasets that were not closed
+            appContext.getDatasetLifecycleManager().flushAllDatasets();
+        }
+
+        //mark the partitions to be closed as inactive
+        PersistentLocalResourceRepository localResourceRepo = (PersistentLocalResourceRepository) appContext
+                .getLocalResourceRepository();
+        for (Integer partitionId : partitions) {
+            localResourceRepo.addInactivePartition(partitionId);
+        }
+
+        //send response after partitions prepared for failback
+        PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(planId,
+                requestId, partitions);
+        try {
+            broker.sendMessageToCC(reponse, null);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    @Override
+    public String type() {
+        return "PREPARE_PARTITIONS_FAILBACK_REQUEST";
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
similarity index 72%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
index 467c6cb..2e52773 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -16,10 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
 
 import java.util.Set;
 
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
 public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage {
 
     private static final long serialVersionUID = 1L;
@@ -30,12 +34,17 @@
         this.partitions = partitions;
     }
 
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_RESPONSE;
-    }
-
     public Set<Integer> getPartitions() {
         return partitions;
     }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        AsterixClusterProperties.INSTANCE.processPreparePartitionsFailbackResponse(this);
+    }
+
+    @Override
+    public String type() {
+        return "PREPARE_PARTITIONS_FAILBACK_RESPONSE";
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
new file mode 100644
index 0000000..2aa4746
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReplicaEventMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final ClusterEventType event;
+    private final String nodeIPAddress;
+
+    public ReplicaEventMessage(String nodeId, String nodeIPAddress, ClusterEventType event) {
+        this.nodeId = nodeId;
+        this.nodeIPAddress = nodeIPAddress;
+        this.event = event;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public ClusterEventType getEvent() {
+        return event;
+    }
+
+    public String getNodeIPAddress() {
+        return nodeIPAddress;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        Node node = new Node();
+        node.setId(nodeId);
+        node.setClusterIp(nodeIPAddress);
+        Replica replica = new Replica(node);
+        appContext.getReplicationManager().reportReplicaEvent(new ReplicaEvent(replica, event));
+    }
+
+    @Override
+    public String type() {
+        return "REPLICA_EVENT";
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
new file mode 100644
index 0000000..1604e29
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class ReportMaxResourceIdMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName());
+    private final long maxResourceId;
+    private final String src;
+
+    public ReportMaxResourceIdMessage(String src, long maxResourceId) {
+        this.src = src;
+        this.maxResourceId = maxResourceId;
+    }
+
+    public long getMaxResourceId() {
+        return maxResourceId;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        IAsterixResourceIdManager resourceIdManager =
+                AsterixAppContextInfo.INSTANCE.getResourceIdManager();
+        resourceIdManager.report(src, maxResourceId);
+    }
+
+    public static void send(NodeControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
+                MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
+        ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
+        try {
+            ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg, null);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    @Override
+    public String type() {
+        return "REPORT_MAX_RESOURCE_ID_RESPONSE";
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
similarity index 64%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
index d2837ce..3e2becf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReportMaxResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
@@ -16,22 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
 
 public class ReportMaxResourceIdRequestMessage extends AbstractApplicationMessage {
     private static final long serialVersionUID = 1L;
-    public long maxResourceId;
 
     @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.REPORT_MAX_RESOURCE_ID_REQUEST;
+    public void handle(IControllerService cs) throws HyracksDataException {
+        ReportMaxResourceIdMessage.send((NodeControllerService) cs);
     }
 
-    public long getMaxResourceId() {
-        return maxResourceId;
-    }
-
-    public void setMaxResourceId(long maxResourceId) {
-        this.maxResourceId = maxResourceId;
+    @Override
+    public String type() {
+        return "REPORT_MAX_RESOURCE_ID_REQUEST";
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
new file mode 100644
index 0000000..5e7d808
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class ResourceIdRequestMessage extends AbstractApplicationMessage {
+    private static final long serialVersionUID = 1L;
+    private final String src;
+
+    public ResourceIdRequestMessage(String src) {
+        this.src = src;
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        try {
+            ICCMessageBroker broker =
+                    (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
+            ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage();
+            reponse.setId(id);
+            if (!AsterixClusterProperties.INSTANCE.isClusterActive()) {
+                reponse.setResourceId(-1);
+                reponse.setException(new Exception("Cannot generate global resource id when cluster is not active."));
+            } else {
+                IAsterixResourceIdManager resourceIdManager =
+                        AsterixAppContextInfo.INSTANCE.getResourceIdManager();
+                reponse.setResourceId(resourceIdManager.createResourceId());
+                if (reponse.getResourceId() < 0) {
+                    reponse.setException(new Exception("One or more nodes has not reported max resource id."));
+                }
+                requestMaxResourceID(resourceIdManager, broker);
+            }
+            broker.sendApplicationMessageToNC(reponse, src);
+        } catch (Exception e) {
+            throw ExceptionUtils.convertToHyracksDataException(e);
+        }
+    }
+
+    private void requestMaxResourceID(IAsterixResourceIdManager resourceIdManager, ICCMessageBroker broker)
+            throws Exception {
+        Set<String> getParticipantNodes = AsterixClusterProperties.INSTANCE.getParticipantNodes();
+        ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage();
+        msg.setId(ICCMessageBroker.NO_CALLBACK_MESSAGE_ID);
+        for (String nodeId : getParticipantNodes) {
+            if (!resourceIdManager.reported(nodeId)) {
+                broker.sendApplicationMessageToNC(msg, nodeId);
+            }
+        }
+    }
+
+    @Override
+    public String type() {
+        return "RESOURCE_ID_REQUEST";
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
similarity index 69%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index 09c50d3..62c2163 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -16,7 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
 
 public class ResourceIdRequestResponseMessage extends AbstractApplicationMessage {
     private static final long serialVersionUID = 1L;
@@ -24,11 +28,6 @@
     private long resourceId;
     private Exception exception;
 
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.RESOURCE_ID_RESPONSE;
-    }
-
     public long getResourceId() {
         return resourceId;
     }
@@ -44,4 +43,15 @@
     public void setException(Exception exception) {
         this.exception = exception;
     }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        // Do nothing. for this message, the callback handles it, we probably should get rid of callbacks and
+        // instead, use the handle in the response to perform callback action
+    }
+
+    @Override
+    public String type() {
+        return "RESOURCE_ID_RESPONSE";
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..b18a879
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        HyracksDataException hde = null;
+        try {
+            appContext.initializeMetadata(false);
+            appContext.exportMetadataNodeStub();
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
+            hde = new HyracksDataException(e);
+        } finally {
+            TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+                    appContext.getTransactionSubsystem().getId());
+            try {
+                broker.sendMessageToCC(reponse, null);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
+                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            }
+        }
+        if (hde != null) {
+            throw hde;
+        }
+    }
+
+    @Override
+    public String type() {
+        return "TAKEOVER_METADATA_NODE_REQUEST";
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
similarity index 67%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
index bc98a62..f7016bc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -16,7 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
 
 public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
 
@@ -27,12 +32,17 @@
         this.nodeId = nodeId;
     }
 
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
-    }
-
     public String getNodeId() {
         return nodeId;
     }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(this);
+    }
+
+    @Override
+    public String type() {
+        return "TAKEOVER_METADATA_NODE_RESPONSE";
+    }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..3b91084
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.message;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.nc.NodeControllerService;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(TakeoverPartitionsRequestMessage.class.getName());
+    private final Integer[] partitions;
+    private final long requestId;
+    private final String nodeId;
+
+    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.partitions = partitionsToTakeover;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Request ID: " + requestId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Partitions: ");
+        for (Integer partitionId : partitions) {
+            sb.append(partitionId + ",");
+        }
+        //remove last comma
+        sb.charAt(sb.length() - 1);
+        return sb.toString();
+    }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        NodeControllerService ncs = (NodeControllerService) cs;
+        IAsterixAppRuntimeContext appContext =
+                (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        INCMessageBroker broker = (INCMessageBroker) ncs.getApplicationContext().getMessageBroker();
+        //if the NC is shutting down, it should ignore takeover partitions request
+        if (!appContext.isShuttingdown()) {
+            HyracksDataException hde = null;
+            try {
+                IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+                remoteRecoeryManager.takeoverPartitons(partitions);
+            } catch (IOException | ACIDException e) {
+                LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
+                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            } finally {
+                //send response after takeover is completed
+                TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
+                        appContext.getTransactionSubsystem().getId(), partitions);
+                try {
+                    broker.sendMessageToCC(reponse, null);
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
+                    hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+                }
+            }
+            if (hde != null) {
+                throw hde;
+            }
+        }
+    }
+
+    @Override
+    public String type() {
+        return "TAKEOVER_PARTITIONS_REQUEST";
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
similarity index 72%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
index 54597d9..9ec71b7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -16,7 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.messaging;
+package org.apache.asterix.runtime.message;
+
+import org.apache.asterix.common.messaging.AbstractApplicationMessage;
+import org.apache.asterix.runtime.util.AsterixClusterProperties;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
 
 public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
 
@@ -31,11 +36,6 @@
         this.partitions = partitionsToTakeover;
     }
 
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
-    }
-
     public Integer[] getPartitions() {
         return partitions;
     }
@@ -47,4 +47,14 @@
     public long getRequestId() {
         return requestId;
     }
+
+    @Override
+    public void handle(IControllerService cs) throws HyracksDataException {
+        AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(this);
+    }
+
+    @Override
+    public String type() {
+        return "TAKEOVER_PARTITIONS_RESPONSE";
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
similarity index 82%
rename from asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index ab1ebe1..4cb65c2 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -16,30 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.transaction.management.resource;
+package org.apache.asterix.runtime.transaction;
 
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
-import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
+import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
 import org.apache.hyracks.api.application.IApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 /**
- * A resource id factory that generates unique resource ids across all NCs by requesting unique ids from the cluster controller.
+ * A resource id factory that generates unique resource ids across all NCs by requesting
+ * unique ids from the cluster controller.
  */
 public class GlobalResourceIdFactory implements IResourceIdFactory, IApplicationMessageCallback {
 
     private final IApplicationContext appCtx;
     private final LinkedBlockingQueue<IApplicationMessage> resourceIdResponseQ;
+    private final String nodeId;
 
     public GlobalResourceIdFactory(IApplicationContext appCtx) {
         this.appCtx = appCtx;
         this.resourceIdResponseQ = new LinkedBlockingQueue<>();
+        this.nodeId = ((NodeControllerService) appCtx.getControllerService()).getApplicationContext().getNodeId();
     }
 
     @Override
@@ -47,16 +51,16 @@
         try {
             ResourceIdRequestResponseMessage reponse = null;
             //if there already exists a response, use it
-            if (resourceIdResponseQ.size() > 0) {
+            if (!resourceIdResponseQ.isEmpty()) {
                 synchronized (resourceIdResponseQ) {
-                    if (resourceIdResponseQ.size() > 0) {
+                    if (!resourceIdResponseQ.isEmpty()) {
                         reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
                     }
                 }
             }
             //if no response available or it has an exception, request a new one
             if (reponse == null || reponse.getException() != null) {
-                ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
+                ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
                 ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
                 reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
similarity index 94%
rename from asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
index 7bcf379..9027f75 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactoryProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactoryProvider.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.transaction.management.resource;
+package org.apache.asterix.runtime.transaction;
 
 import org.apache.hyracks.api.application.IApplicationContext;
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
similarity index 84%
rename from asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
index e7b15a2..4f2cebd 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.om.util;
+package org.apache.asterix.runtime.util;
 
 import java.io.IOException;
 import java.util.logging.Logger;
@@ -37,7 +37,7 @@
 import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
 import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -51,9 +51,11 @@
  */
 public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IAsterixPropertiesProvider {
 
-    private static AsterixAppContextInfo INSTANCE;
-    private final ICCApplicationContext appCtx;
-
+    public static final AsterixAppContextInfo INSTANCE = new AsterixAppContextInfo();
+    private ICCApplicationContext appCtx;
+    private IGlobalRecoveryMaanger globalRecoveryMaanger;
+    private ILibraryManager libraryManager;
+    private IAsterixResourceIdManager resourceIdManager;
     private AsterixCompilerProperties compilerProperties;
     private AsterixExternalProperties externalProperties;
     private AsterixMetadataProperties metadataProperties;
@@ -64,19 +66,26 @@
     private AsterixReplicationProperties replicationProperties;
     private AsterixExtensionProperties extensionProperties;
     private MessagingProperties messagingProperties;
-    private final IGlobalRecoveryMaanger globalRecoveryMaanger;
     private IHyracksClientConnection hcc;
-    private final ILibraryManager libraryManager;
     private Object extensionManager;
+    private volatile boolean initialized = false;
 
-    public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
-            IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager)
+    private AsterixAppContextInfo() {
+    }
+
+    public static synchronized void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+            IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager,
+            IAsterixResourceIdManager resourceIdManager)
             throws AsterixException, IOException {
-        if (INSTANCE != null) {
-            return;
+        if (INSTANCE.initialized) {
+            throw new AsterixException(AsterixAppContextInfo.class.getSimpleName() + " has been initialized already");
         }
-        INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc, globalRecoveryMaanger, libraryManager);
-
+        INSTANCE.initialized = true;
+        INSTANCE.appCtx = ccAppCtx;
+        INSTANCE.hcc = hcc;
+        INSTANCE.globalRecoveryMaanger = globalRecoveryMaanger;
+        INSTANCE.libraryManager = libraryManager;
+        INSTANCE.resourceIdManager = resourceIdManager;
         // Determine whether to use old-style asterix-configuration.xml or new-style configuration.
         // QQQ strip this out eventually
         AsterixPropertiesAccessor propertiesAccessor;
@@ -102,16 +111,8 @@
         Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel());
     }
 
-    private AsterixAppContextInfo(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
-            IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager) {
-        this.appCtx = ccAppCtx;
-        this.hcc = hcc;
-        this.globalRecoveryMaanger = globalRecoveryMaanger;
-        this.libraryManager = libraryManager;
-    }
-
-    public static AsterixAppContextInfo getInstance() {
-        return INSTANCE;
+    public boolean initialized() {
+        return initialized;
     }
 
     @Override
@@ -199,4 +200,8 @@
     public MessagingProperties getMessagingProperties() {
         return messagingProperties;
     }
+
+    public IAsterixResourceIdManager getResourceIdManager() {
+        return resourceIdManager;
+    }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
similarity index 81%
rename from asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
index 5acf9ae..2457ddc 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixClusterProperties.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.om.util;
+package org.apache.asterix.runtime.util;
 
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -26,6 +26,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.logging.Level;
@@ -38,20 +39,20 @@
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
-import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
-import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.common.messaging.ReplicaEventMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
-import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.replication.NodeFailbackPlan;
-import org.apache.asterix.common.replication.NodeFailbackPlan.FailbackPlanState;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NodeFailbackPlan;
+import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
 import org.json.JSONException;
@@ -62,10 +63,10 @@
  */
 
 public class AsterixClusterProperties {
-    /**
-     * TODO: currently after instance restarts we require all nodes to join again, otherwise the cluster wont be ACTIVE.
-     * we may overcome this by storing the cluster state before the instance shutdown and using it on startup to identify
-     * the nodes that are expected the join.
+    /*
+     * TODO: currently after instance restarts we require all nodes to join again,
+     * otherwise the cluster wont be ACTIVE. we may overcome this by storing the cluster state before the instance
+     * shutdown and using it on startup to identify the nodes that are expected the join.
      */
 
     private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
@@ -75,7 +76,7 @@
     private static final String CLUSTER_NET_IP_ADDRESS_KEY = "cluster-net-ip-address";
     private static final String IO_DEVICES = "iodevices";
     private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
-    private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<String, Map<String, String>>();
+    private Map<String, Map<String, String>> activeNcConfiguration = new HashMap<>();
 
     private final Cluster cluster;
     private ClusterState state = ClusterState.UNUSABLE;
@@ -105,24 +106,23 @@
                 Unmarshaller unmarshaller = ctx.createUnmarshaller();
                 cluster = (Cluster) unmarshaller.unmarshal(is);
             } catch (JAXBException e) {
-                throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
+                throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE, e);
             }
         } else {
             cluster = null;
         }
         // if this is the CC process
-        if (AsterixAppContextInfo.getInstance() != null) {
-            if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
-                node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
-                clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
-                currentMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
-                replicationEnabled = isReplicationEnabled();
-                autoFailover = isAutoFailoverEnabled();
-                if (autoFailover) {
-                    pendingTakeoverRequests = new HashMap<>();
-                    pendingProcessingFailbackPlans = new LinkedList<>();
-                    planId2FailbackPlanMap = new HashMap<>();
-                }
+        if (AsterixAppContextInfo.INSTANCE.initialized()
+                && AsterixAppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+            node2PartitionsMap = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
+            clusterPartitions = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
+            currentMetadataNode = AsterixAppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+            replicationEnabled = isReplicationEnabled();
+            autoFailover = isAutoFailoverEnabled();
+            if (autoFailover) {
+                pendingTakeoverRequests = new HashMap<>();
+                pendingProcessingFailbackPlans = new LinkedList<>();
+                planId2FailbackPlanMap = new HashMap<>();
             }
         }
     }
@@ -214,12 +214,9 @@
             state = ClusterState.ACTIVE;
             LOGGER.info("Cluster is now ACTIVE");
             //start global recovery
-            AsterixAppContextInfo.getInstance().getGlobalRecoveryManager().startGlobalRecovery();
-            if (autoFailover) {
-                //if there are any pending failback requests, process them
-                if (pendingProcessingFailbackPlans.size() > 0) {
-                    processPendingFailbackPlans();
-                }
+            AsterixAppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
+            if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
+                processPendingFailbackPlans();
             }
         } else {
             requestMetadataNodeTakeover();
@@ -227,26 +224,11 @@
     }
 
     /**
-     * Returns the number of IO devices configured for a Node Controller
-     *
-     * @param nodeId
-     *            unique identifier of the Node Controller
-     * @return number of IO devices. -1 if the node id is not valid. A node id
-     *         is not valid if it does not correspond to the set of registered
-     *         Node Controllers.
-     */
-    public int getNumberOfIODevices(String nodeId) {
-        String[] ioDevs = getIODevices(nodeId);
-        return ioDevs == null ? -1 : ioDevs.length;
-    }
-
-    /**
      * Returns the IO devices configured for a Node Controller
      *
      * @param nodeId
      *            unique identifier of the Node Controller
-     * @return a list of IO devices. null if node id is not valid. A node id is not valid
-     *         if it does not correspond to the set of registered Node Controllers.
+     * @return a list of IO devices.
      */
     public synchronized String[] getIODevices(String nodeId) {
         Map<String, String> ncConfig = activeNcConfiguration.get(nodeId);
@@ -255,7 +237,7 @@
                 LOGGER.warning("Configuration parameters for nodeId " + nodeId
                         + " not found. The node has not joined yet or has left.");
             }
-            return null;
+            return new String[0];
         }
         return ncConfig.get(IO_DEVICES).split(",");
     }
@@ -274,7 +256,7 @@
     }
 
     public synchronized Set<String> getParticipantNodes() {
-        Set<String> participantNodes = new HashSet<String>();
+        Set<String> participantNodes = new HashSet<>();
         for (String pNode : activeNcConfiguration.keySet()) {
             participantNodes.add(pNode);
         }
@@ -316,7 +298,7 @@
     }
 
     public static int getNumberOfNodes() {
-        return AsterixAppContextInfo.getInstance().getMetadataProperties().getNodeNames().size();
+        return AsterixAppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
     }
 
     public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
@@ -349,12 +331,12 @@
     private synchronized void requestPartitionsTakeover(String failedNodeId) {
         //replica -> list of partitions to takeover
         Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
-        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.INSTANCE
                 .getReplicationProperties();
 
         //collect the partitions of the failed NC
         List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
-        if (lostPartitions.size() > 0) {
+        if (!lostPartitions.isEmpty()) {
             for (ClusterPartition partition : lostPartitions) {
                 //find replicas for this partitions
                 Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
@@ -362,15 +344,8 @@
                 for (String replica : partitionReplicas) {
                     //TODO (mhubail) currently this assigns the partition to the first found active replica.
                     //It needs to be modified to consider load balancing.
-                    if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
-                        if (!partitionRecoveryPlan.containsKey(replica)) {
-                            List<Integer> replicaPartitions = new ArrayList<>();
-                            replicaPartitions.add(partition.getPartitionId());
-                            partitionRecoveryPlan.put(replica, replicaPartitions);
-                        } else {
-                            partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
-                        }
-                    }
+                    addActiveReplica(replica, partition, partitionRecoveryPlan);
+                    // bug? will always break on first loop execution
                     break;
                 }
             }
@@ -382,11 +357,12 @@
             } else {
                 LOGGER.info("Partitions to recover: " + lostPartitions);
             }
-            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
                     .getCCApplicationContext().getMessageBroker();
             //For each replica, send a request to takeover the assigned partitions
-            for (String replica : partitionRecoveryPlan.keySet()) {
-                Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
+            for (Entry<String, List<Integer>> entry : partitionRecoveryPlan.entrySet()) {
+                String replica = entry.getKey();
+                Integer[] partitionsToTakeover = entry.getValue().toArray(new Integer[entry.getValue().size()]);
                 long requestId = clusterRequestId++;
                 TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId,
                         replica, partitionsToTakeover);
@@ -399,13 +375,25 @@
                      * has failed. When the failure notification arrives, we will send any pending request
                      * that belongs to the failed NC to a different active replica.
                      */
-                    LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
-                    e.printStackTrace();
+                    LOGGER.log(Level.WARNING, "Failed to send takeover request: " + takeoverRequest, e);
                 }
             }
         }
     }
 
+    private void addActiveReplica(String replica, ClusterPartition partition,
+            Map<String, List<Integer>> partitionRecoveryPlan) {
+        if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
+            if (!partitionRecoveryPlan.containsKey(replica)) {
+                List<Integer> replicaPartitions = new ArrayList<>();
+                replicaPartitions.add(partition.getPartitionId());
+                partitionRecoveryPlan.put(replica, replicaPartitions);
+            } else {
+                partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+            }
+        }
+    }
+
     private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
         List<ClusterPartition> nodePartitions = new ArrayList<>();
         for (ClusterPartition partition : clusterPartitions.values()) {
@@ -436,11 +424,11 @@
 
     private synchronized void requestMetadataNodeTakeover() {
         //need a new node to takeover metadata node
-        ClusterPartition metadataPartiton = AsterixAppContextInfo.getInstance().getMetadataProperties()
+        ClusterPartition metadataPartiton = AsterixAppContextInfo.INSTANCE.getMetadataProperties()
                 .getMetadataPartition();
         //request the metadataPartition node to register itself as the metadata node
         TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
-        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
                 .getCCApplicationContext().getMessageBroker();
         try {
             messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
@@ -450,8 +438,8 @@
              * has failed. When the failure notification arrives, a new NC will be assigned to
              * the metadata partition and a new metadata node takeover request will be sent to it.
              */
-            LOGGER.warning("Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId());
-            e.printStackTrace();
+            LOGGER.log(Level.WARNING,
+                    "Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId(), e);
         }
     }
 
@@ -479,7 +467,7 @@
         planId2FailbackPlanMap.put(plan.getPlanId(), plan);
 
         //get all partitions this node requires to resync
-        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.INSTANCE
                 .getReplicationProperties();
         Set<String> nodeReplicas = replicationProperties.getNodeReplicationClients(failingBackNodeId);
         for (String replicaId : nodeReplicas) {
@@ -531,7 +519,7 @@
                      * if the returning node is the original metadata node,
                      * then metadata node will change after the failback completes
                      */
-                    String originalMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties()
+                    String originalMetadataNode = AsterixAppContextInfo.INSTANCE.getMetadataProperties()
                             .getMetadataNodeName();
                     if (originalMetadataNode.equals(failbackNode)) {
                         plan.setNodeToReleaseMetadataManager(currentMetadataNode);
@@ -541,23 +529,9 @@
 
                     //force new jobs to wait
                     state = ClusterState.REBALANCING;
-
-                    ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+                    ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
                             .getCCApplicationContext().getMessageBroker();
-                    //send requests to other nodes to complete on-going jobs and prepare partitions for failback
-                    Set<PreparePartitionsFailbackRequestMessage> planFailbackRequests = plan.getPlanFailbackRequests();
-                    for (PreparePartitionsFailbackRequestMessage request : planFailbackRequests) {
-                        try {
-                            messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
-                            plan.addPendingRequest(request);
-                        } catch (Exception e) {
-                            LOGGER.warning("Failed to send failback request to: " + request.getNodeID());
-                            e.printStackTrace();
-                            plan.notifyNodeFailure(request.getNodeID());
-                            revertFailedFailbackPlanEffects();
-                            break;
-                        }
-                    }
+                    handleFailbackRequests(plan, messageBroker);
                     /**
                      * wait until the current plan is completed before processing the next plan.
                      * when the current one completes or is reverted, the cluster state will be
@@ -572,6 +546,21 @@
         }
     }
 
+    private void handleFailbackRequests(NodeFailbackPlan plan, ICCMessageBroker messageBroker) {
+        //send requests to other nodes to complete on-going jobs and prepare partitions for failback
+        for (PreparePartitionsFailbackRequestMessage request : plan.getPlanFailbackRequests()) {
+            try {
+                messageBroker.sendApplicationMessageToNC(request, request.getNodeID());
+                plan.addPendingRequest(request);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, "Failed to send failback request to: " + request.getNodeID(), e);
+                plan.notifyNodeFailure(request.getNodeID());
+                revertFailedFailbackPlanEffects();
+                break;
+            }
+        }
+    }
+
     public synchronized void processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage msg) {
         NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
         plan.markRequestCompleted(msg.getRequestId());
@@ -585,13 +574,12 @@
             CompleteFailbackRequestMessage request = plan.getCompleteFailbackRequestMessage();
 
             //send complete resync and takeover partitions to the failing back node
-            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+            ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
                     .getCCApplicationContext().getMessageBroker();
             try {
                 messageBroker.sendApplicationMessageToNC(request, request.getNodeId());
             } catch (Exception e) {
-                LOGGER.warning("Failed to send complete failback request to: " + request.getNodeId());
-                e.printStackTrace();
+                LOGGER.log(Level.WARNING, "Failed to send complete failback request to: " + request.getNodeId(), e);
                 notifyFailbackPlansNodeFailure(request.getNodeId());
                 revertFailedFailbackPlanEffects();
             }
@@ -617,7 +605,7 @@
     }
 
     private synchronized void notifyImpactedReplicas(String nodeId, ClusterEventType event) {
-        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.INSTANCE
                 .getReplicationProperties();
         Set<String> remoteReplicas = replicationProperties.getRemoteReplicasIds(nodeId);
         String nodeIdAddress = "";
@@ -627,7 +615,7 @@
         }
 
         ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
-        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE
                 .getCCApplicationContext().getMessageBroker();
         for (String replica : remoteReplicas) {
             //if the remote replica is alive, send the event
@@ -635,7 +623,7 @@
                 try {
                     messageBroker.sendApplicationMessageToNC(msg, replica);
                 } catch (Exception e) {
-                    e.printStackTrace();
+                    LOGGER.log(Level.WARNING, "Failed sending an application message to an NC", e);
                 }
             }
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java
new file mode 100644
index 0000000..abe94bf
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixRuntimeComponentsProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.util;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import org.apache.hyracks.storage.common.IStorageManagerInterface;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.file.IFileMapProvider;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+
+public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
+        ILSMIOOperationSchedulerProvider {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider();
+
+    private AsterixRuntimeComponentsProvider() {
+    }
+
+    @Override
+    public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getLSMIOScheduler();
+    }
+
+    @Override
+    public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getBufferCache();
+    }
+
+    @Override
+    public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getFileMapManager();
+    }
+
+    @Override
+    public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getLocalResourceRepository();
+    }
+
+    @Override
+    public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getDatasetLifecycleManager();
+    }
+
+    @Override
+    public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+        return ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject())
+                .getResourceIdFactory();
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
similarity index 74%
rename from asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
index 0e9aa0c..2517df5 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixRuntimeUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.om.util;
+package org.apache.asterix.runtime.util;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -26,37 +26,40 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 
 /**
  * Utility class for obtaining information on the set of Hyracks NodeController
  * processes that are running on a given host.
  */
-public class AsterixRuntimeUtil {
+public class RuntimeUtils {
 
-    public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) throws Exception {
-        Map<InetAddress, Set<String>> nodeControllerInfo = getNodeControllerMap();
-        Set<String> nodeControllersAtLocation = nodeControllerInfo.get(ipAddress);
-        return nodeControllersAtLocation;
+    private RuntimeUtils() {
     }
 
-    public static List<String> getAllNodeControllers() throws Exception {
+    public static Set<String> getNodeControllersOnIP(InetAddress ipAddress) throws HyracksDataException {
+        Map<InetAddress, Set<String>> nodeControllerInfo = getNodeControllerMap();
+        return nodeControllerInfo.get(ipAddress);
+    }
+
+    public static List<String> getAllNodeControllers() throws HyracksDataException {
         Collection<Set<String>> nodeControllersCollection = getNodeControllerMap().values();
-        List<String> nodeControllers = new ArrayList<String>();
+        List<String> nodeControllers = new ArrayList<>();
         for (Set<String> ncCollection : nodeControllersCollection) {
             nodeControllers.addAll(ncCollection);
         }
         return nodeControllers;
     }
 
-    public static Map<InetAddress, Set<String>> getNodeControllerMap() throws Exception {
-        Map<InetAddress, Set<String>> map = new HashMap<InetAddress, Set<String>>();
-        AsterixAppContextInfo.getInstance().getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
+    public static Map<InetAddress, Set<String>> getNodeControllerMap() throws HyracksDataException {
+        Map<InetAddress, Set<String>> map = new HashMap<>();
+        AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
         return map;
     }
 
-    public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) throws Exception {
-        ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.getInstance()
+    public static void getNodeControllerMap(Map<InetAddress, Set<String>> map) {
+        ClusterControllerService ccs = (ClusterControllerService) AsterixAppContextInfo.INSTANCE
                 .getCCApplicationContext().getControllerService();
         map.putAll(ccs.getIpAddressNodeNameMap());
     }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 420e479..006eac7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -32,6 +32,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -59,22 +60,27 @@
 
 public class PersistentLocalResourceRepository implements ILocalResourceRepository {
 
+    // Public constants
+    public static final String METADATA_FILE_NAME = ".metadata";
+    // Private constants
     private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
-    private final String[] mountPoints;
     private static final String STORAGE_METADATA_DIRECTORY = "asterix_root_metadata";
     private static final String STORAGE_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata";
     private static final long STORAGE_LOCAL_RESOURCE_ID = -4321;
-    public static final String METADATA_FILE_NAME = ".metadata";
-    public static final int STORAGE_VERSION = LIFOMetaDataFrame.VERSION;
-    private final Cache<String, LocalResource> resourceCache;
-    private final String nodeId;
     private static final int MAX_CACHED_RESOURCES = 1000;
-    private IReplicationManager replicationManager;
-    private boolean isReplicationEnabled = false;
-    private Set<String> filesToBeReplicated;
+    private static final FilenameFilter METADATA_FILES_FILTER =
+            (File dir, String name) -> name.equalsIgnoreCase(METADATA_FILE_NAME);
+    // Finals
+    private final String[] mountPoints;
+    private final String nodeId;
+    private final Cache<String, LocalResource> resourceCache;
     private final SortedMap<Integer, ClusterPartition> clusterPartitions;
     private final Set<Integer> nodeOriginalPartitions;
     private final Set<Integer> nodeActivePartitions;
+    // Mutables
+    private boolean isReplicationEnabled = false;
+    private Set<String> filesToBeReplicated;
+    private IReplicationManager replicationManager;
     private Set<Integer> nodeInactivePartitions;
 
     public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
@@ -88,8 +94,8 @@
             if (!mountPointDir.exists()) {
                 throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist.");
             }
-            if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
-                mountPoints[i] = mountPoint + System.getProperty("file.separator");
+            if (!mountPoint.endsWith(File.separator)) {
+                mountPoints[i] = mountPoint + File.separator;
             } else {
                 mountPoints[i] = mountPoint;
             }
@@ -120,9 +126,13 @@
             LOGGER.info("Initializing local resource repository ... ");
         }
 
-        //create storage metadata file (This file is used to locate the root storage directory after instance restarts).
-        //TODO with the existing cluster configuration file being static and distributed on all NCs, we can find out the storage root
-        //directory without looking at this file. This file could potentially store more information, otherwise no need to keep it.
+        /*
+         * create storage metadata file
+         * (This file is used to locate the root storage directory after instance restarts).
+         * TODO with the existing cluster configuration file being static and distributed on all NCs
+         * we can find out the storage root directory without looking at this file.
+         * This file could potentially store more information, otherwise no need to keep it.
+         */
         for (int i = 0; i < mountPoints.length; i++) {
             File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
             File storageMetadataDir = storageMetadataFile.getParentFile();
@@ -138,9 +148,9 @@
                     "created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath());
 
             String storageRootDirPath;
-            if (storageRootDirName.startsWith(System.getProperty("file.separator"))) {
+            if (storageRootDirName.startsWith(File.separator)) {
                 storageRootDirPath = mountPoints[i]
-                        + storageRootDirName.substring(System.getProperty("file.separator").length());
+                        + storageRootDirName.substring(File.separator.length());
             } else {
                 storageRootDirPath = mountPoints[i] + storageRootDirName;
             }
@@ -215,47 +225,52 @@
         return new File(resourcePath + File.separator + METADATA_FILE_NAME);
     }
 
-    public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
+    public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
         //TODO During recovery, the memory usage currently is proportional to the number of resources available.
         //This could be fixed by traversing all resources on disk until the required resource is found.
-        HashMap<Long, LocalResource> resourcesMap = new HashMap<Long, LocalResource>();
-
+        Map<Long, LocalResource> resourcesMap = new HashMap<>();
         for (int i = 0; i < mountPoints.length; i++) {
             File storageRootDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
             if (storageRootDir == null) {
                 continue;
             }
-
             //load all local resources.
             File[] partitions = storageRootDir.listFiles();
             for (File partition : partitions) {
                 File[] dataverseFileList = partition.listFiles();
                 if (dataverseFileList != null) {
                     for (File dataverseFile : dataverseFileList) {
-                        if (dataverseFile.isDirectory()) {
-                            File[] indexFileList = dataverseFile.listFiles();
-                            if (indexFileList != null) {
-                                for (File indexFile : indexFileList) {
-                                    if (indexFile.isDirectory()) {
-                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
-                                        if (metadataFiles != null) {
-                                            for (File metadataFile : metadataFiles) {
-                                                LocalResource localResource = readLocalResource(metadataFile);
-                                                resourcesMap.put(localResource.getResourceId(), localResource);
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
+                        loadDataverse(dataverseFile, resourcesMap);
                     }
                 }
             }
         }
-
         return resourcesMap;
     }
 
+    private void loadDataverse(File dataverseFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException {
+        if (dataverseFile.isDirectory()) {
+            File[] indexFileList = dataverseFile.listFiles();
+            if (indexFileList != null) {
+                for (File indexFile : indexFileList) {
+                    loadIndex(indexFile, resourcesMap);
+                }
+            }
+        }
+    }
+
+    private void loadIndex(File indexFile, Map<Long, LocalResource> resourcesMap) throws HyracksDataException {
+        if (indexFile.isDirectory()) {
+            File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+            if (metadataFiles != null) {
+                for (File metadataFile : metadataFiles) {
+                    LocalResource localResource = readLocalResource(metadataFile);
+                    resourcesMap.put(localResource.getResourceId(), localResource);
+                }
+            }
+        }
+    }
+
     @Override
     public long getMaxResourceID() throws HyracksDataException {
         long maxResourceId = 0;
@@ -273,46 +288,52 @@
                 File[] dataverseFileList = partition.listFiles();
                 if (dataverseFileList != null) {
                     for (File dataverseFile : dataverseFileList) {
-                        if (dataverseFile.isDirectory()) {
-                            File[] indexFileList = dataverseFile.listFiles();
-                            if (indexFileList != null) {
-                                for (File indexFile : indexFileList) {
-                                    if (indexFile.isDirectory()) {
-                                        File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
-                                        if (metadataFiles != null) {
-                                            for (File metadataFile : metadataFiles) {
-                                                LocalResource localResource = readLocalResource(metadataFile);
-                                                maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
+                        maxResourceId = getMaxResourceIdForDataverse(dataverseFile, maxResourceId);
                     }
                 }
             }
         }
+        return maxResourceId;
+    }
 
+    private long getMaxResourceIdForDataverse(File dataverseFile, long maxSoFar) throws HyracksDataException {
+        long maxResourceId = maxSoFar;
+        if (dataverseFile.isDirectory()) {
+            File[] indexFileList = dataverseFile.listFiles();
+            if (indexFileList != null) {
+                for (File indexFile : indexFileList) {
+                    maxResourceId = getMaxResourceIdForIndex(indexFile, maxResourceId);
+                }
+            }
+        }
+        return maxResourceId;
+    }
+
+    private long getMaxResourceIdForIndex(File indexFile, long maxSoFar) throws HyracksDataException {
+        long maxResourceId = maxSoFar;
+        if (indexFile.isDirectory()) {
+            File[] metadataFiles = indexFile.listFiles(METADATA_FILES_FILTER);
+            if (metadataFiles != null) {
+                for (File metadataFile : metadataFiles) {
+                    LocalResource localResource = readLocalResource(metadataFile);
+                    maxResourceId = Math.max(maxResourceId, localResource.getResourceId());
+                }
+            }
+        }
         return maxResourceId;
     }
 
     private static String getFileName(String baseDir, long resourceId) {
-        if (resourceId == STORAGE_LOCAL_RESOURCE_ID) {
-            return baseDir;
-        } else {
-            if (!baseDir.endsWith(System.getProperty("file.separator"))) {
-                baseDir += System.getProperty("file.separator");
-            }
-            return baseDir + METADATA_FILE_NAME;
-        }
+        return (resourceId == STORAGE_LOCAL_RESOURCE_ID) ? baseDir
+                : baseDir.endsWith(File.separator) ? (baseDir + METADATA_FILE_NAME)
+                        : (baseDir + File.separator + METADATA_FILE_NAME);
     }
 
     public static LocalResource readLocalResource(File file) throws HyracksDataException {
         try (FileInputStream fis = new FileInputStream(file);
                 ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
             LocalResource resource = (LocalResource) oisFromFis.readObject();
-            if (resource.getVersion() == PersistentLocalResourceRepository.STORAGE_VERSION) {
+            if (resource.getVersion() == LIFOMetaDataFrame.VERSION) {
                 return resource;
             } else {
                 throw new AsterixException("Storage version mismatch.");
@@ -322,23 +343,12 @@
         }
     }
 
-    private static final FilenameFilter METADATA_FILES_FILTER = new FilenameFilter() {
-        @Override
-        public boolean accept(File dir, String name) {
-            if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
-                return true;
-            } else {
-                return false;
-            }
-        }
-    };
-
     public void setReplicationManager(IReplicationManager replicationManager) {
         this.replicationManager = replicationManager;
         isReplicationEnabled = replicationManager.isReplicationEnabled();
 
         if (isReplicationEnabled) {
-            filesToBeReplicated = new HashSet<String>();
+            filesToBeReplicated = new HashSet<>();
             nodeInactivePartitions = ConcurrentHashMap.newKeySet();
         }
     }
@@ -379,12 +389,9 @@
     public void deleteStorageData(boolean deleteStorageMetadata) throws IOException {
         for (int i = 0; i < mountPoints.length; i++) {
             File storageDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
-            if (storageDir != null) {
-                if (storageDir.isDirectory()) {
-                    FileUtils.deleteDirectory(storageDir);
-                }
+            if (storageDir != null && storageDir.isDirectory()) {
+                FileUtils.deleteDirectory(storageDir);
             }
-
             if (deleteStorageMetadata) {
                 //delete the metadata root directory
                 File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
@@ -405,8 +412,7 @@
     private static File getStorageMetadataFile(String mountPoint, String nodeId, int ioDeviceId) {
         String storageMetadataFileName = getStorageMetadataDirPath(mountPoint, nodeId, ioDeviceId) + File.separator
                 + STORAGE_METADATA_FILE_NAME_PREFIX;
-        File storageMetadataFile = new File(storageMetadataFileName);
-        return storageMetadataFile;
+        return new File(storageMetadataFileName);
     }
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
index 35aeef5..f9618cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
@@ -23,12 +23,13 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.topology.ClusterTopology;
 
 public interface ICCContext {
     public ClusterControllerInfo getClusterControllerInfo();
 
-    public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws Exception;
+    public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException;
 
     public ClusterTopology getClusterTopology();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index 1ce4f23..b1fa494 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -36,7 +36,6 @@
     }
 
     private JobId() {
-
     }
 
     public JobId(long id) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
similarity index 72%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index daeb9c4..bd69f9e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/ResourceIdRequestMessage.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.common.messaging;
+package org.apache.hyracks.api.job;
 
-public class ResourceIdRequestMessage extends AbstractApplicationMessage {
-    private static final long serialVersionUID = 1L;
+public class JobIdFactory {
+    private long id = 0;
 
-    @Override
-    public ApplicationMessageType getMessageType() {
-        return ApplicationMessageType.RESOURCE_ID_REQUEST;
+    public JobId create() {
+        return new JobId(id++);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 58edf60..ae097a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -44,7 +44,9 @@
 import org.apache.hyracks.api.dataset.DatasetDirectoryRecord;
 import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
 import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.JobInfo;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.service.IControllerService;
@@ -148,7 +150,7 @@
 
     private final IDatasetDirectoryService datasetDirectoryService;
 
-    private long jobCounter;
+    private final JobIdFactory jobIdFactory;
 
     private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
 
@@ -162,8 +164,8 @@
         this.ccConfig = ccConfig;
         File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
         jobLog = new LogFile(jobLogFolder);
-        nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
-        ipAddressNodeNameMap = new HashMap<InetAddress, Set<String>>();
+        nodeRegistry = new LinkedHashMap<>();
+        ipAddressNodeNameMap = new HashMap<>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
         IIPCI ccIPCI = new ClusterControllerIPCI();
         clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
@@ -172,7 +174,7 @@
         clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
                 new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this);
-        activeRunMap = new HashMap<JobId, JobRun>();
+        activeRunMap = new HashMap<>();
         runMapArchive = new LinkedHashMap<JobId, JobRun>() {
             private static final long serialVersionUID = 1L;
 
@@ -195,28 +197,12 @@
         workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
         this.timer = new Timer(true);
         final ClusterTopology topology = computeClusterTopology(ccConfig);
-        ccContext = new ICCContext() {
-            @Override
-            public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws Exception {
-                GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(ClusterControllerService.this, map);
-                workQueue.scheduleAndSync(ginmw);
-            }
-
-            @Override
-            public ClusterControllerInfo getClusterControllerInfo() {
-                return info;
-            }
-
-            @Override
-            public ClusterTopology getClusterTopology() {
-                return topology;
-            }
-        };
+        ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
         datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
-        jobCounter = 0;
+        jobIdFactory = new JobIdFactory();
 
-        deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
+        deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
     }
 
@@ -361,10 +347,6 @@
         return appCtx;
     }
 
-    private JobId createJobId() {
-        return new JobId(jobCounter++);
-    }
-
     public ClusterControllerInfo getClusterControllerInfo() {
         return info;
     }
@@ -381,6 +363,34 @@
         return new NetworkAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
     }
 
+    private final class ClusterControllerContext implements ICCContext {
+        private final ClusterTopology topology;
+
+        private ClusterControllerContext(ClusterTopology topology) {
+            this.topology = topology;
+        }
+
+        @Override
+        public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException {
+            GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork(ClusterControllerService.this, map);
+            try {
+                workQueue.scheduleAndSync(ginmw);
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        @Override
+        public ClusterControllerInfo getClusterControllerInfo() {
+            return info;
+        }
+
+        @Override
+        public ClusterTopology getClusterTopology() {
+            return topology;
+        }
+    }
+
     private class DeadNodeSweeper extends TimerTask {
         @Override
         public void run() {
@@ -393,6 +403,7 @@
     }
 
     private class HyracksClientInterfaceIPCI implements IIPCI {
+
         @Override
         public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
                 Exception exception) {
@@ -426,7 +437,7 @@
                 case START_JOB: {
                     HyracksClientInterfaceFunctions.StartJobFunction sjf =
                             (HyracksClientInterfaceFunctions.StartJobFunction) fn;
-                    JobId jobId = createJobId();
+                    JobId jobId = jobIdFactory.create();
                     workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
                             sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
                     return;
@@ -693,5 +704,4 @@
     public synchronized ShutdownRun getShutdownRun() {
         return shutdownCallback;
     }
-
 }