Merge branch 'gerrit/neo' into 'gerrit/trinity'

Change-Id: I504ef146b58dc216996b4976937e53088af3c8b4
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 3e2feb9..bf42541 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
+import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -444,17 +445,10 @@
         MetadataNode.INSTANCE.initialize(this, ncExtensionManager.getMetadataTupleTranslatorProvider(),
                 ncExtensionManager.getMetadataExtensions(), partitionId);
 
-        //noinspection unchecked
-        ConcurrentHashMap<CcId, IAsterixStateProxy> proxyMap =
-                (ConcurrentHashMap<CcId, IAsterixStateProxy>) getServiceContext().getDistributedState();
-        if (proxyMap == null) {
-            throw new IllegalStateException("Metadata node cannot access distributed state");
-        }
-
         // This is a special case, we just give the metadataNode directly.
         // This way we can delay the registration of the metadataNode until
         // it is completely initialized.
-        MetadataManager.initialize(proxyMap.values(), MetadataNode.INSTANCE);
+        MetadataManager.initialize(getAsterixStateProxies(), MetadataNode.INSTANCE);
         MetadataBootstrap.startUniverse(getServiceContext(), newUniverse);
         MetadataBootstrap.startDDLRecovery();
         ncExtensionManager.initializeMetadata(getServiceContext());
@@ -482,6 +476,17 @@
         metadataNodeStub = null;
     }
 
+    protected Collection<IAsterixStateProxy> getAsterixStateProxies() {
+        //noinspection unchecked
+        ConcurrentHashMap<CcId, IAsterixStateProxy> proxyMap =
+                (ConcurrentHashMap<CcId, IAsterixStateProxy>) getServiceContext().getDistributedState();
+        if (proxyMap == null) {
+            throw new IllegalStateException("Metadata node cannot access distributed state");
+        }
+
+        return proxyMap.values();
+    }
+
     @Override
     public synchronized void bindMetadataNodeStub(CcId ccId) throws RemoteException {
         if (metadataNodeStub == null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 2a66cfd..bc67823 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -27,6 +27,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -120,7 +121,7 @@
 public class CCApplication extends BaseCCApplication {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static IAsterixStateProxy proxy;
+    private IAsterixStateProxy proxy;
     protected CCExtensionManager ccExtensionManager;
     protected IStorageComponentProvider componentProvider;
     protected WebManager webManager;
@@ -166,8 +167,7 @@
         }
         MetadataProperties metadataProperties = appCtx.getMetadataProperties();
 
-        setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(controllerService.getNetworkSecurityManager(),
-                metadataProperties.getMetadataCallbackPort()));
+        proxy = getAsterixStateProxy(controllerService, metadataProperties);
         ccServiceCtx.setDistributedState(proxy);
         MetadataManager.initialize(proxy, metadataProperties, appCtx);
         ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
@@ -362,8 +362,10 @@
         ApplicationConfigurator.registerConfigOptions(configManager);
     }
 
-    public static synchronized void setAsterixStateProxy(IAsterixStateProxy proxy) {
-        CCApplication.proxy = proxy;
+    protected IAsterixStateProxy getAsterixStateProxy(ClusterControllerService controllerService,
+            MetadataProperties metadataProperties) throws RemoteException {
+        return AsterixStateProxy.registerRemoteObject(controllerService.getNetworkSecurityManager(),
+                metadataProperties.getMetadataCallbackPort());
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index f60349f..a223e0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -88,6 +88,7 @@
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.config.IConfigManager;
@@ -154,7 +155,7 @@
         MetadataBuiltinFunctions.init();
 
         ncExtensionManager = new NCExtensionManager(new ArrayList<>(getExtensions()));
-        runtimeContext = new NCAppRuntimeContext(ncServiceCtx, ncExtensionManager, getPropertiesFactory());
+        runtimeContext = createNCApplicationContext(ncServiceCtx, ncExtensionManager, getPropertiesFactory());
         MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {
             if (LOGGER.isInfoEnabled()) {
@@ -186,6 +187,12 @@
         performLocalCleanUp();
     }
 
+    protected INcApplicationContext createNCApplicationContext(INCServiceContext ncServiceCtx,
+            NCExtensionManager ncExtensionManager, IPropertiesFactory propertiesFactory)
+            throws IOException, AsterixException {
+        return new NCAppRuntimeContext(ncServiceCtx, ncExtensionManager, propertiesFactory);
+    }
+
     protected IRecoveryManagerFactory getRecoveryManagerFactory() {
         return RecoveryManager::new;
     }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index e8c2c1d..1e096ac 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -64,6 +64,7 @@
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
 import org.kohsuke.args4j.CmdLineException;
 
 @SuppressWarnings({ "squid:ClassVariableVisibilityCheck", "squid:S00112" })
@@ -130,7 +131,7 @@
         configManager.processConfig();
         ccConfig.setKeyStorePath(joinPath(RESOURCES_PATH, ccConfig.getKeyStorePath()));
         ccConfig.setTrustStorePath(joinPath(RESOURCES_PATH, ccConfig.getTrustStorePath()));
-        cc = new ClusterControllerService(ccConfig, ccApplication);
+        cc = createCC(ccApplication, ccConfig);
 
         nodeNames = ccConfig.getConfigManager().getNodeNames();
         if (deleteOldInstanceData && nodeNames != null) {
@@ -151,8 +152,7 @@
             }
             ncApplication.registerConfig(ncConfigManager);
             opts.forEach(opt -> ncConfigManager.set(nodeId, opt.getLeft(), opt.getRight()));
-            nodeControllers
-                    .add(new NodeControllerService(fixupPaths(createNCConfig(nodeId, ncConfigManager)), ncApplication));
+            nodeControllers.add(createNC(fixupPaths(createNCConfig(nodeId, ncConfigManager)), ncApplication));
         }
 
         opts.forEach(opt -> configManager.set(opt.getLeft(), opt.getRight()));
@@ -186,6 +186,7 @@
         this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
     }
 
+    @NotNull
     private void configureExternalLibDir() {
         // hack to ensure we have a unique location for external libraries in our tests (asterix cluster has a shared
         // home directory)-- TODO: rework this once the external lib dir can be configured explicitly
@@ -224,10 +225,20 @@
         return ccConfig;
     }
 
+    protected ClusterControllerService createCC(ICCApplication ccApplication, CCConfig ccConfig) throws Exception {
+        return new ClusterControllerService(ccConfig, ccApplication);
+    }
+
     protected ICCApplication createCCApplication() {
         return new CCApplication();
     }
 
+    @NotNull
+    protected NodeControllerService createNC(NCConfig config, INCApplication ncApplication)
+            throws IOException, CmdLineException, AsterixException {
+        return new NodeControllerService(config, ncApplication);
+    }
+
     protected NCConfig createNCConfig(String ncName, ConfigManager configManager) {
         NCConfig ncConfig = new NCConfig(ncName, configManager);
         ncConfig.setClusterAddress("localhost");
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index cedcccf..b4aeaf1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -39,10 +39,11 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     private IMetadataNode metadataNode;
-    private static final IAsterixStateProxy cc = new AsterixStateProxy();
+    private static IAsterixStateProxy cc;
 
     public static IAsterixStateProxy registerRemoteObject(INetworkSecurityManager networkSecurityManager,
             int metadataCallbackPort) throws RemoteException {
+        cc = new AsterixStateProxy();
         IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort,
                 RMIClientFactory.getSocketFactory(networkSecurityManager),
                 RMIServerFactory.getSocketFactory(networkSecurityManager));
@@ -51,8 +52,10 @@
     }
 
     public static void unregisterRemoteObject() throws RemoteException {
-        UnicastRemoteObject.unexportObject(cc, true);
-        LOGGER.info("Asterix Distributed State Proxy Unbound");
+        if (cc != null) {
+            UnicastRemoteObject.unexportObject(cc, true);
+            LOGGER.info("Asterix Distributed State Proxy Unbound");
+        }
     }
 
     @Override