Enable Adding Nodes to Running *DB Cluster

Also ability to configure unique partition ids without having access to
complete cluster topology

Change-Id: If978442a95687c00ef78c89ed1b4440f5e308b99
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1785
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index 396665e..f1a123c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -69,10 +69,11 @@
     protected void get(IServletRequest request, IServletResponse response) {
         response.setStatus(HttpResponseStatus.OK);
         try {
-            HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8);
+            HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
         } catch (IOException e) {
             LOGGER.log(Level.WARNING, "Failure setting content type", e);
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            response.writer().write(e.toString());
             return;
         }
         PrintWriter out = response.writer();
@@ -84,7 +85,6 @@
             if (dataverseName == null || datasetName == null) {
                 jsonResponse.put("error", "Parameter dataverseName or datasetName is null,");
                 out.write(jsonResponse.toString());
-                out.flush();
                 return;
             }
 
@@ -127,15 +127,15 @@
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 // Writes file splits.
                 out.write(jsonResponse.toString());
-                out.flush();
             } finally {
                 metadataProvider.getLocks().unlock();
             }
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Failure handling a request", e);
-            out.println(e.getMessage());
+            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            out.write(e.toString());
+        } finally {
             out.flush();
-            e.printStackTrace(out);
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 479c8b0..da04c52 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
@@ -320,7 +319,7 @@
         param.path = servletPath(request);
         if (HttpUtil.ContentType.APPLICATION_JSON.equals(contentType)) {
             try {
-                JsonNode jsonRequest = new ObjectMapper().readTree(getRequestBody(request));
+                JsonNode jsonRequest = new ObjectMapper().readTree(HttpUtil.getRequestBody(request));
                 param.statement = jsonRequest.get(Parameter.STATEMENT.str()).asText();
                 param.format = toLower(getOptText(jsonRequest, Parameter.FORMAT.str()));
                 param.pretty = getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false);
@@ -333,7 +332,7 @@
         } else {
             param.statement = request.getParameter(Parameter.STATEMENT.str());
             if (param.statement == null) {
-                param.statement = getRequestBody(request);
+                param.statement = HttpUtil.getRequestBody(request);
             }
             param.format = toLower(request.getParameter(Parameter.FORMAT.str()));
             param.pretty = Boolean.parseBoolean(request.getParameter(Parameter.PRETTY.str()));
@@ -343,10 +342,6 @@
         return param;
     }
 
-    private static String getRequestBody(IServletRequest request) throws IOException {
-        return request.getHttpRequest().content().toString(StandardCharsets.UTF_8);
-    }
-
     private static ResultDelivery parseResultDelivery(String mode) {
         if ("async".equals(mode)) {
             return ResultDelivery.ASYNC;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 6b1e408..18aae8e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -22,7 +22,6 @@
 import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
@@ -222,7 +221,7 @@
     //TODO: Both Get and Post of this API must use the same parameter names
     private String query(IServletRequest request) {
         if (request.getHttpRequest().method() == HttpMethod.POST) {
-            return request.getHttpRequest().content().toString(StandardCharsets.UTF_8);
+            return HttpUtil.getRequestBody(request);
         } else {
             return getQueryParameter(request);
         }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
index 1abc3f0..bc069b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
@@ -25,6 +25,7 @@
 
 import org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage;
 import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
 
 public class NodeFailbackPlan {
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 1816a25..2a1fd0b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -27,7 +27,6 @@
 
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -86,16 +85,6 @@
     }
 
     @Override
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
-        // Do nothing
-    }
-
-    @Override
-    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
-        // Do nothing?
-    }
-
-    @Override
     public void startGlobalRecovery(ICcApplicationContext appCtx) {
         // perform global recovery if state changed to active
         final ClusterState newState = ClusterStateManager.INSTANCE.getState();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
index fef4e31..c3cf86b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
@@ -19,6 +19,7 @@
  * under the License.
  */
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -27,26 +28,35 @@
 
     /**
      * @param deadNodeIds
-     * @return
+     * @return set of work to execute as a result of this node failure
      */
-    public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds);
+    default Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds) {
+        // default is no-op
+        return Collections.emptySet();
+    }
 
     /**
      * @param joinedNodeId
-     * @return
+     * @return set of work to execute as a result of this node join
      */
-    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId);
+    default Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+        // default is no-op
+        return Collections.emptySet();
+    }
 
     /**
      * @param response
      */
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response);
+    default void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+        // default is no-op
+    }
 
     /**
      * @param previousState
      * @param newState
      */
-    public void notifyStateChange(ClusterState previousState, ClusterState newState);
-
+    default void notifyStateChange(ClusterState previousState, ClusterState newState) {
+        // default is no-op
+    }
 
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a753db3..a5686fd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -97,4 +98,13 @@
     boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
             throws HyracksDataException, InterruptedException;
 
+    /**
+     * Register the specified node partitions with the specified nodeId with this cluster state manager
+     */
+    void registerNodePartitions(String nodeId, ClusterPartition[] nodePartitions) throws AsterixException;
+
+    /**
+     * De-register the specified node's partitions from this cluster state manager
+     */
+    void deregisterNodePartitions(String nodeId);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index 1d09fff..7f09b98 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -43,7 +43,11 @@
                 appConfig -> FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "txn-log"),
                 "The directory where transaction logs should be stored",
                 "<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() + ">/txn-log"),
-        STORAGE_SUBDIR(OptionTypes.STRING, "storage", "The subdirectory name under each iodevice used for storage"),;
+        STORAGE_SUBDIR(OptionTypes.STRING, "storage", "The subdirectory name under each iodevice used for storage"),
+        STARTING_PARTITION_ID(
+                OptionTypes.INTEGER,
+                -1,
+                "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -92,7 +96,7 @@
 
         @Override
         public boolean hidden() {
-            return this == INITIAL_RUN;
+            return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
         }
 
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index c5ec1c0..d011864 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -30,6 +30,7 @@
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +57,7 @@
 import org.apache.asterix.common.configuration.Store;
 import org.apache.asterix.common.configuration.TransactionLogDir;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.utils.ConfigUtil;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -78,7 +80,7 @@
     private final Map<String, String> transactionLogDirs = new HashMap<>();
     private final Map<String, String> asterixBuildProperties = new HashMap<>();
     private final Map<String, ClusterPartition[]> nodePartitionsMap;
-    private final SortedMap<Integer, ClusterPartition> clusterPartitions = new TreeMap<>();
+    private final SortedMap<Integer, ClusterPartition> clusterPartitions;
     // For extensions
     private final List<AsterixExtension> extensions;
 
@@ -87,19 +89,20 @@
      */
     private PropertiesAccessor(IApplicationConfig cfg) throws AsterixException, IOException {
         this.cfg = cfg;
-        nodePartitionsMap = new HashMap<>();
+        nodePartitionsMap = new ConcurrentHashMap<>();
+        clusterPartitions = Collections.synchronizedSortedMap(new TreeMap<>());
         extensions = new ArrayList<>();
         // Determine whether to use old-style asterix-configuration.xml or new-style configuration.
         // QQQ strip this out eventually
         // QQQ this is NOT a good way to determine whether to use config file
-        ConfigManager configManager = ((ConfigManagerApplicationConfig)cfg).getConfigManager();
+        ConfigManager configManager = ((ConfigManagerApplicationConfig) cfg).getConfigManager();
         boolean usingConfigFile = Stream
                 .of((IOption) ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL)
                 .map(configManager::get).anyMatch(Objects::nonNull);
         AsterixConfiguration asterixConfiguration = null;
         try {
-            asterixConfiguration = configure(System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
-                    GlobalConfig.DEFAULT_CONFIG_FILE_NAME));
+            asterixConfiguration = configure(
+                    System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY, GlobalConfig.DEFAULT_CONFIG_FILE_NAME));
         } catch (Exception e) {
             // cannot load config file, assume new-style config
         }
@@ -123,6 +126,7 @@
             // partition directory (as formed by appending the <store> subdirectory to
             // each <iodevices> path from the user's original cluster.xml).
             for (Store store : configuredStores) {
+                configManager.set(store.getNcId(), NodeProperties.Option.STARTING_PARTITION_ID, uniquePartitionId);
                 String trimmedStoreDirs = store.getStoreDirs().trim();
                 String[] nodeStores = trimmedStoreDirs.split(",");
                 ClusterPartition[] nodePartitions = new ClusterPartition[nodeStores.length];
@@ -153,8 +157,8 @@
                         continue;
                     }
                     if (option != null) {
-                        throw new IllegalStateException("ERROR: option found in multiple sections: " +
-                                Arrays.asList(option, optionTemp));
+                        throw new IllegalStateException(
+                                "ERROR: option found in multiple sections: " + Arrays.asList(option, optionTemp));
                     }
                     option = optionTemp;
                 }
@@ -175,12 +179,12 @@
             MutableInt uniquePartitionId = new MutableInt(0);
             // Iterate through each configured NC.
             for (String ncName : cfg.getNCNames()) {
-                configureNc(ncName, uniquePartitionId);
+                configureNc(configManager, ncName, uniquePartitionId);
             }
             for (String section : cfg.getSectionNames()) {
                 if (section.startsWith(AsterixProperties.SECTION_PREFIX_EXTENSION)) {
-                    String className = AsterixProperties.getSectionId(
-                            AsterixProperties.SECTION_PREFIX_EXTENSION, section);
+                    String className = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_EXTENSION,
+                            section);
                     configureExtension(className, section);
                 }
             }
@@ -230,15 +234,21 @@
         extensions.add(new AsterixExtension(className, kvs));
     }
 
-    private void configureNc(String ncId, MutableInt uniquePartitionId) {
+    private void configureNc(ConfigManager configManager, String ncId, MutableInt uniquePartitionId)
+            throws AsterixException {
 
         // Now we assign the coredump and txnlog directories for this node.
         // QQQ Default values? Should they be specified here? Or should there
         // be a default.ini? Certainly wherever they are, they should be platform-dependent.
         IApplicationConfig nodeCfg = cfg.getNCEffectiveConfig(ncId);
         coredumpConfig.put(ncId, nodeCfg.getString(NodeProperties.Option.CORE_DUMP_DIR));
-        transactionLogDirs.put(ncId,
-                nodeCfg.getString(NodeProperties.Option.TXN_LOG_DIR));
+        transactionLogDirs.put(ncId, nodeCfg.getString(NodeProperties.Option.TXN_LOG_DIR));
+        int partitionId = nodeCfg.getInt(NodeProperties.Option.STARTING_PARTITION_ID);
+        if (partitionId != -1) {
+            uniquePartitionId.setValue(partitionId);
+        } else {
+            configManager.set(ncId, NodeProperties.Option.STARTING_PARTITION_ID, uniquePartitionId.getValue());
+        }
 
         // Now we create an array of ClusterPartitions for all the partitions
         // on this NC.
@@ -250,9 +260,12 @@
             // Construct final storage path from iodevice dir + storage subdirs
             nodeStores[i] = iodevices[i] + File.separator + storageSubdir;
             // Create ClusterPartition instances for this NC.
-            ClusterPartition partition = new ClusterPartition(uniquePartitionId.getValue(), ncId, i);
-            uniquePartitionId.increment();
-            clusterPartitions.put(partition.getPartitionId(), partition);
+            ClusterPartition partition = new ClusterPartition(uniquePartitionId.getAndIncrement(), ncId, i);
+            ClusterPartition orig = clusterPartitions.put(partition.getPartitionId(), partition);
+            if (orig != null) {
+                throw AsterixException.create(ErrorCode.DUPLICATE_PARTITION_ID, partition.getPartitionId(), ncId,
+                        orig.getNodeId());
+            }
             nodePartitions[i] = partition;
         }
         stores.put(ncId, nodeStores);
@@ -302,8 +315,8 @@
             return value == null ? defaultValue : interpreter.parse(value);
         } catch (IllegalArgumentException e) {
             if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.severe("Invalid property value '" + value + "' for property '" + property + "'.\n" +
-                        "Default = " + defaultValue);
+                LOGGER.severe("Invalid property value '" + value + "' for property '" + property + "'.\n" + "Default = "
+                        + defaultValue);
             }
             throw e;
         }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 35b7d4c..eb73e5e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -187,6 +187,9 @@
     public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086;
     public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087;
 
+    // Lifecycle management errors
+    public static final int DUPLICATE_PARTITION_ID = 4000;
+
     private ErrorCode() {
     }
 
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 1f80fad..026f71a 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -174,4 +174,7 @@
 3084 = Duplicate record reader format: %1$s
 3085 = Unknown Adapter Name.
 3086 = Cannot find record reader %1$s with specified configuration.
-3087 = Cannot find function %1$s
\ No newline at end of file
+3087 = Cannot find function %1$s
+
+# Lifecycle management errors
+4000 = Partition id %1$d for node %2$s already in use by node %3$s
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/RetainLogsRule.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/RetainLogsRule.java
index 39bd5e7..2077ad5 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/RetainLogsRule.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/RetainLogsRule.java
@@ -19,6 +19,9 @@
 package org.apache.asterix.test.base;
 
 import java.io.File;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.reflect.Method;
 
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
@@ -26,15 +29,17 @@
 public class RetainLogsRule extends TestWatcher {
     private final File baseDir;
     private final File destDir;
+    private final Object instance;
     private long startTime;
 
-    public RetainLogsRule(File baseDir, File destDir) {
+    public RetainLogsRule(File baseDir, File destDir, Object instance) {
         this.baseDir = baseDir;
         this.destDir = destDir;
+        this.instance = instance;
     }
 
-    public RetainLogsRule(String baseDir, String destDir) {
-        this(new File(baseDir), new File(destDir));
+    public RetainLogsRule(String baseDir, String destDir, Object instance) {
+        this(new File(baseDir), new File(destDir), instance);
     }
 
     @Override
@@ -44,7 +49,7 @@
 
     @Override
     protected void failed(Throwable e, Description description) {
-        File reportDir = new File(destDir, description.getTestClass().getName() + "." + description.getMethodName());
+        File reportDir = new File(destDir, description.getTestClass().getSimpleName() + "." + description.getMethodName());
         reportDir.mkdirs();
         try {
             AsterixTestHelper.deepSelectiveCopy(baseDir, reportDir,
@@ -54,4 +59,23 @@
             e1.printStackTrace();
         }
     }
+
+    @Override
+    protected void finished(Description description) {
+        if (instance != null) {
+            for (Method m : instance.getClass().getMethods()) {
+                if (m.isAnnotationPresent(After.class)) {
+                    try {
+                        m.invoke(instance);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    public @interface After {
+    }
 }
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
index 9d0a1db..06e5aed 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
@@ -67,7 +67,7 @@
 
     @Rule
     public TestRule retainLogs = new RetainLogsRule(
-            AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+            AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
 
     @BeforeClass
     public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
index 6e9dd44..c379517 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
@@ -50,7 +50,7 @@
 
     @Rule
     public TestRule retainLogs = new RetainLogsRule(
-            AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+            AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
 
     @BeforeClass
     public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixRestartIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixRestartIT.java
index ce4de7c..d3fdc4a 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixRestartIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixRestartIT.java
@@ -63,7 +63,7 @@
     }
 
     @Rule
-    public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+    public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
 
     @BeforeClass
     public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java
index 7a0a797..7fe156a 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java
@@ -26,11 +26,10 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.event.model.AsterixInstance.State;
-import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.test.base.RetainLogsRule;
+import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.commons.lang3.StringUtils;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -62,7 +61,7 @@
     }
 
     @Rule
-    public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+    public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -89,7 +88,7 @@
         LOGGER.info("Instance is in ACTIVE state.");
     }
 
-    @After
+    @RetainLogsRule.After
     public void after() throws Exception {
         LOGGER.info("Destroying instance...");
         AsterixInstallerIntegrationUtil.deinit();
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
index 44ce7f8..21f382b 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
@@ -29,7 +29,6 @@
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.hyracks.util.file.FileUtil;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -60,7 +59,7 @@
     }
 
     @Rule
-    public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+    public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -86,7 +85,7 @@
         LOGGER.info("Instance is in ACTIVE state.");
     }
 
-    @After
+    @RetainLogsRule.After
     public void after() throws Exception {
         LOGGER.info("Destroying instance...");
         AsterixInstallerIntegrationUtil.deinit();
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/DmlRecoveryIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/DmlRecoveryIT.java
index a2968a5..4f04e79 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/DmlRecoveryIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/DmlRecoveryIT.java
@@ -60,7 +60,7 @@
     private final TestExecutor testExecutor = new TestExecutor();
 
     @Rule
-    public TestRule retainLogs = new RetainLogsRule(managixHomePath, reportPath);
+    public TestRule retainLogs = new RetainLogsRule(managixHomePath, reportPath, this);
 
     @BeforeClass
     public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/RecoveryIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/RecoveryIT.java
index a0612cc..0e7ac8b 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/RecoveryIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/RecoveryIT.java
@@ -58,7 +58,7 @@
     private final TestExecutor testExecutor = new TestExecutor();
 
     @Rule
-    public TestRule retainLogs = new RetainLogsRule(managixHomePath, reportPath);
+    public TestRule retainLogs = new RetainLogsRule(managixHomePath, reportPath, this);
 
     @BeforeClass
     public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 48937f8..64ef5c2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.utils;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -35,6 +36,8 @@
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
@@ -249,8 +252,8 @@
                 clusterActiveLocations.add(p.getActiveNodeId());
             }
         }
-        clusterPartitionConstraint =
-                new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {}));
+        clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+                clusterActiveLocations.toArray(new String[] {}));
     }
 
     public boolean isGlobalRecoveryCompleted() {
@@ -350,4 +353,34 @@
     public String getCurrentMetadataNodeId() {
         return currentMetadataNode;
     }
+
+    @Override
+    public synchronized void registerNodePartitions(String nodeId, ClusterPartition[] nodePartitions)
+            throws AsterixException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Registering node partitions for node " + nodeId + ": " + Arrays.toString(nodePartitions));
+        }
+        // We want to make sure there are no conflicts; make two passes for simplicity...
+        for (ClusterPartition nodePartition : nodePartitions) {
+            if (clusterPartitions.containsKey(nodePartition.getPartitionId())) {
+                throw AsterixException.create(ErrorCode.DUPLICATE_PARTITION_ID, nodePartition.getPartitionId(), nodeId,
+                        clusterPartitions.get(nodePartition.getPartitionId()).getNodeId());
+            }
+        }
+        for (ClusterPartition nodePartition : nodePartitions) {
+            clusterPartitions.put(nodePartition.getPartitionId(), nodePartition);
+        }
+        node2PartitionsMap.put(nodeId, nodePartitions);
+    }
+
+    @Override
+    public synchronized void deregisterNodePartitions(String nodeId) {
+        ClusterPartition [] nodePartitions = node2PartitionsMap.remove(nodeId);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Deegistering node partitions for node " + nodeId + ": " + Arrays.toString(nodePartitions));
+        }
+        for (ClusterPartition nodePartition : nodePartitions) {
+            clusterPartitions.remove(nodePartition.getPartitionId());
+        }
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
index b1f0892..64e328a 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
@@ -31,7 +31,9 @@
     private final int errorCode;
     private final Serializable[] params;
     private final String nodeId;
-    private transient volatile String msgCache;
+
+    @SuppressWarnings("squid:S1165") // exception class not final
+    private transient CachedMessage msgCache;
 
     public AlgebricksException(String component, int errorCode, String message, Throwable cause, String nodeId,
             Serializable... params) {
@@ -42,16 +44,12 @@
         this.params = params;
     }
 
-    public static AlgebricksException create(int errorCode, Serializable... params) {
-        return new AlgebricksException(ErrorCode.HYRACKS, errorCode, ErrorCode.getErrorMessage(errorCode), params);
-    }
-
     /**
      * @deprecated Error code is needed.
      */
     @Deprecated
     public AlgebricksException(String message) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
+        this(ErrorMessageUtil.NONE, UNKNOWN, message, null, (Serializable[]) null);
     }
 
     /**
@@ -59,23 +57,7 @@
      */
     @Deprecated
     public AlgebricksException(Throwable cause) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
-    }
-
-    /**
-     * @deprecated Error code is needed.
-     */
-    @Deprecated
-    public AlgebricksException(Throwable cause, String nodeId) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
-    }
-
-    /**
-     * @deprecated Error code is needed.
-     */
-    @Deprecated
-    public AlgebricksException(String message, Throwable cause, String nodeId) {
-        this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
+        this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, (Serializable[]) null);
     }
 
     /**
@@ -107,6 +89,10 @@
         this(component, errorCode, message, cause, null, params);
     }
 
+    public static AlgebricksException create(int errorCode, Serializable... params) {
+        return new AlgebricksException(ErrorCode.HYRACKS, errorCode, ErrorCode.getErrorMessage(errorCode), params);
+    }
+
     public String getComponent() {
         return component;
     }
@@ -122,10 +108,17 @@
     @Override
     public String getMessage() {
         if (msgCache == null) {
-            synchronized (this) {
-                msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
-            }
+            msgCache = new CachedMessage(
+                    ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params));
         }
-        return msgCache;
+        return msgCache.message;
+    }
+
+    private static class CachedMessage {
+        private final String message;
+
+        private CachedMessage(String message) {
+            this.message = message;
+        }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ErrorMessageUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ErrorMessageUtil.java
index 390e2b5..467d148 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ErrorMessageUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ErrorMessageUtil.java
@@ -19,7 +19,6 @@
 
 package org.apache.hyracks.api.util;
 
-import java.io.IOError;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
@@ -30,8 +29,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hyracks.api.exceptions.ErrorCode;
-
 public class ErrorMessageUtil {
 
     private static final Logger LOGGER = Logger.getLogger(ErrorMessageUtil.class.getName());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index d6d8bc4..47e78a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -84,19 +84,14 @@
         }
         // Updates the node registry.
         if (nodeRegistry.containsKey(nodeId)) {
-            LOGGER.warning("Node with name " + nodeId + " has already registered.");
-            return;
+            LOGGER.warning("Node with name " + nodeId + " has already registered; re-registering");
         }
         nodeRegistry.put(nodeId, ncState);
 
         // Updates the IP address to node names map.
         try {
             InetAddress ipAddress = getIpAddress(ncState);
-            Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
-            if (nodes == null) {
-                nodes = new HashSet<>();
-                ipAddressNodeNameMap.put(ipAddress, nodes);
-            }
+            Set<String> nodes = ipAddressNodeNameMap.computeIfAbsent(ipAddress, k -> new HashSet<>());
             nodes.add(nodeId);
         } catch (HyracksException e) {
             // If anything fails, we ignore the node.
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 45763fa..c11deef 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.http.server.utils;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 
@@ -74,6 +75,10 @@
         return request.method() == HttpMethod.POST ? PostRequest.create(request) : BaseRequest.create(request);
     }
 
+    public static String getRequestBody(IServletRequest request) {
+        return request.getHttpRequest().content().toString(StandardCharsets.UTF_8);
+    }
+
     public static void setContentType(IServletResponse response, String type, String charset) throws IOException {
         response.setHeader(HttpHeaderNames.CONTENT_TYPE, type + "; charset=" + charset);
     }