Enable Adding Nodes to Running *DB Cluster
Also ability to configure unique partition ids without having access to
complete cluster topology
Change-Id: If978442a95687c00ef78c89ed1b4440f5e308b99
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1785
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index 396665e..f1a123c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -69,10 +69,11 @@
protected void get(IServletRequest request, IServletResponse response) {
response.setStatus(HttpResponseStatus.OK);
try {
- HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, HttpUtil.Encoding.UTF8);
+ HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Failure setting content type", e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ response.writer().write(e.toString());
return;
}
PrintWriter out = response.writer();
@@ -84,7 +85,6 @@
if (dataverseName == null || datasetName == null) {
jsonResponse.put("error", "Parameter dataverseName or datasetName is null,");
out.write(jsonResponse.toString());
- out.flush();
return;
}
@@ -127,15 +127,15 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
// Writes file splits.
out.write(jsonResponse.toString());
- out.flush();
} finally {
metadataProvider.getLocks().unlock();
}
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Failure handling a request", e);
- out.println(e.getMessage());
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ out.write(e.toString());
+ } finally {
out.flush();
- e.printStackTrace(out);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 479c8b0..da04c52 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
@@ -320,7 +319,7 @@
param.path = servletPath(request);
if (HttpUtil.ContentType.APPLICATION_JSON.equals(contentType)) {
try {
- JsonNode jsonRequest = new ObjectMapper().readTree(getRequestBody(request));
+ JsonNode jsonRequest = new ObjectMapper().readTree(HttpUtil.getRequestBody(request));
param.statement = jsonRequest.get(Parameter.STATEMENT.str()).asText();
param.format = toLower(getOptText(jsonRequest, Parameter.FORMAT.str()));
param.pretty = getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false);
@@ -333,7 +332,7 @@
} else {
param.statement = request.getParameter(Parameter.STATEMENT.str());
if (param.statement == null) {
- param.statement = getRequestBody(request);
+ param.statement = HttpUtil.getRequestBody(request);
}
param.format = toLower(request.getParameter(Parameter.FORMAT.str()));
param.pretty = Boolean.parseBoolean(request.getParameter(Parameter.PRETTY.str()));
@@ -343,10 +342,6 @@
return param;
}
- private static String getRequestBody(IServletRequest request) throws IOException {
- return request.getHttpRequest().content().toString(StandardCharsets.UTF_8);
- }
-
private static ResultDelivery parseResultDelivery(String mode) {
if ("async".equals(mode)) {
return ResultDelivery.ASYNC;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 6b1e408..18aae8e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -22,7 +22,6 @@
import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
@@ -222,7 +221,7 @@
//TODO: Both Get and Post of this API must use the same parameter names
private String query(IServletRequest request) {
if (request.getHttpRequest().method() == HttpMethod.POST) {
- return request.getHttpRequest().content().toString(StandardCharsets.UTF_8);
+ return HttpUtil.getRequestBody(request);
} else {
return getQueryParameter(request);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
index 1abc3f0..bc069b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NodeFailbackPlan.java
@@ -25,6 +25,7 @@
import org.apache.asterix.app.replication.message.CompleteFailbackRequestMessage;
import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage;
+import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
public class NodeFailbackPlan {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 1816a25..2a1fd0b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -86,16 +85,6 @@
}
@Override
- public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
- // Do nothing
- }
-
- @Override
- public void notifyStateChange(ClusterState previousState, ClusterState newState) {
- // Do nothing?
- }
-
- @Override
public void startGlobalRecovery(ICcApplicationContext appCtx) {
// perform global recovery if state changed to active
final ClusterState newState = ClusterStateManager.INSTANCE.getState();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
index fef4e31..c3cf86b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterEventsSubscriber.java
@@ -19,6 +19,7 @@
* under the License.
*/
import java.util.Collection;
+import java.util.Collections;
import java.util.Set;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -27,26 +28,35 @@
/**
* @param deadNodeIds
- * @return
+ * @return set of work to execute as a result of this node failure
*/
- public Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds);
+ default Set<IClusterManagementWork> notifyNodeFailure(Collection<String> deadNodeIds) {
+ // default is no-op
+ return Collections.emptySet();
+ }
/**
* @param joinedNodeId
- * @return
+ * @return set of work to execute as a result of this node join
*/
- public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId);
+ default Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+ // default is no-op
+ return Collections.emptySet();
+ }
/**
* @param response
*/
- public void notifyRequestCompletion(IClusterManagementWorkResponse response);
+ default void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+ // default is no-op
+ }
/**
* @param previousState
* @param newState
*/
- public void notifyStateChange(ClusterState previousState, ClusterState newState);
-
+ default void notifyStateChange(ClusterState previousState, ClusterState newState) {
+ // default is no-op
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a753db3..a5686fd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -97,4 +98,13 @@
boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit)
throws HyracksDataException, InterruptedException;
+ /**
+ * Register the specified node partitions with the specified nodeId with this cluster state manager
+ */
+ void registerNodePartitions(String nodeId, ClusterPartition[] nodePartitions) throws AsterixException;
+
+ /**
+ * De-register the specified node's partitions from this cluster state manager
+ */
+ void deregisterNodePartitions(String nodeId);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index 1d09fff..7f09b98 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -43,7 +43,11 @@
appConfig -> FileUtil.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "txn-log"),
"The directory where transaction logs should be stored",
"<value of " + ControllerConfig.Option.DEFAULT_DIR.cmdline() + ">/txn-log"),
- STORAGE_SUBDIR(OptionTypes.STRING, "storage", "The subdirectory name under each iodevice used for storage"),;
+ STORAGE_SUBDIR(OptionTypes.STRING, "storage", "The subdirectory name under each iodevice used for storage"),
+ STARTING_PARTITION_ID(
+ OptionTypes.INTEGER,
+ -1,
+ "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
private final IOptionType type;
private final Object defaultValue;
@@ -92,7 +96,7 @@
@Override
public boolean hidden() {
- return this == INITIAL_RUN;
+ return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index c5ec1c0..d011864 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -30,6 +30,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -56,6 +57,7 @@
import org.apache.asterix.common.configuration.Store;
import org.apache.asterix.common.configuration.TransactionLogDir;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.utils.ConfigUtil;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -78,7 +80,7 @@
private final Map<String, String> transactionLogDirs = new HashMap<>();
private final Map<String, String> asterixBuildProperties = new HashMap<>();
private final Map<String, ClusterPartition[]> nodePartitionsMap;
- private final SortedMap<Integer, ClusterPartition> clusterPartitions = new TreeMap<>();
+ private final SortedMap<Integer, ClusterPartition> clusterPartitions;
// For extensions
private final List<AsterixExtension> extensions;
@@ -87,19 +89,20 @@
*/
private PropertiesAccessor(IApplicationConfig cfg) throws AsterixException, IOException {
this.cfg = cfg;
- nodePartitionsMap = new HashMap<>();
+ nodePartitionsMap = new ConcurrentHashMap<>();
+ clusterPartitions = Collections.synchronizedSortedMap(new TreeMap<>());
extensions = new ArrayList<>();
// Determine whether to use old-style asterix-configuration.xml or new-style configuration.
// QQQ strip this out eventually
// QQQ this is NOT a good way to determine whether to use config file
- ConfigManager configManager = ((ConfigManagerApplicationConfig)cfg).getConfigManager();
+ ConfigManager configManager = ((ConfigManagerApplicationConfig) cfg).getConfigManager();
boolean usingConfigFile = Stream
.of((IOption) ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL)
.map(configManager::get).anyMatch(Objects::nonNull);
AsterixConfiguration asterixConfiguration = null;
try {
- asterixConfiguration = configure(System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
- GlobalConfig.DEFAULT_CONFIG_FILE_NAME));
+ asterixConfiguration = configure(
+ System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY, GlobalConfig.DEFAULT_CONFIG_FILE_NAME));
} catch (Exception e) {
// cannot load config file, assume new-style config
}
@@ -123,6 +126,7 @@
// partition directory (as formed by appending the <store> subdirectory to
// each <iodevices> path from the user's original cluster.xml).
for (Store store : configuredStores) {
+ configManager.set(store.getNcId(), NodeProperties.Option.STARTING_PARTITION_ID, uniquePartitionId);
String trimmedStoreDirs = store.getStoreDirs().trim();
String[] nodeStores = trimmedStoreDirs.split(",");
ClusterPartition[] nodePartitions = new ClusterPartition[nodeStores.length];
@@ -153,8 +157,8 @@
continue;
}
if (option != null) {
- throw new IllegalStateException("ERROR: option found in multiple sections: " +
- Arrays.asList(option, optionTemp));
+ throw new IllegalStateException(
+ "ERROR: option found in multiple sections: " + Arrays.asList(option, optionTemp));
}
option = optionTemp;
}
@@ -175,12 +179,12 @@
MutableInt uniquePartitionId = new MutableInt(0);
// Iterate through each configured NC.
for (String ncName : cfg.getNCNames()) {
- configureNc(ncName, uniquePartitionId);
+ configureNc(configManager, ncName, uniquePartitionId);
}
for (String section : cfg.getSectionNames()) {
if (section.startsWith(AsterixProperties.SECTION_PREFIX_EXTENSION)) {
- String className = AsterixProperties.getSectionId(
- AsterixProperties.SECTION_PREFIX_EXTENSION, section);
+ String className = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_EXTENSION,
+ section);
configureExtension(className, section);
}
}
@@ -230,15 +234,21 @@
extensions.add(new AsterixExtension(className, kvs));
}
- private void configureNc(String ncId, MutableInt uniquePartitionId) {
+ private void configureNc(ConfigManager configManager, String ncId, MutableInt uniquePartitionId)
+ throws AsterixException {
// Now we assign the coredump and txnlog directories for this node.
// QQQ Default values? Should they be specified here? Or should there
// be a default.ini? Certainly wherever they are, they should be platform-dependent.
IApplicationConfig nodeCfg = cfg.getNCEffectiveConfig(ncId);
coredumpConfig.put(ncId, nodeCfg.getString(NodeProperties.Option.CORE_DUMP_DIR));
- transactionLogDirs.put(ncId,
- nodeCfg.getString(NodeProperties.Option.TXN_LOG_DIR));
+ transactionLogDirs.put(ncId, nodeCfg.getString(NodeProperties.Option.TXN_LOG_DIR));
+ int partitionId = nodeCfg.getInt(NodeProperties.Option.STARTING_PARTITION_ID);
+ if (partitionId != -1) {
+ uniquePartitionId.setValue(partitionId);
+ } else {
+ configManager.set(ncId, NodeProperties.Option.STARTING_PARTITION_ID, uniquePartitionId.getValue());
+ }
// Now we create an array of ClusterPartitions for all the partitions
// on this NC.
@@ -250,9 +260,12 @@
// Construct final storage path from iodevice dir + storage subdirs
nodeStores[i] = iodevices[i] + File.separator + storageSubdir;
// Create ClusterPartition instances for this NC.
- ClusterPartition partition = new ClusterPartition(uniquePartitionId.getValue(), ncId, i);
- uniquePartitionId.increment();
- clusterPartitions.put(partition.getPartitionId(), partition);
+ ClusterPartition partition = new ClusterPartition(uniquePartitionId.getAndIncrement(), ncId, i);
+ ClusterPartition orig = clusterPartitions.put(partition.getPartitionId(), partition);
+ if (orig != null) {
+ throw AsterixException.create(ErrorCode.DUPLICATE_PARTITION_ID, partition.getPartitionId(), ncId,
+ orig.getNodeId());
+ }
nodePartitions[i] = partition;
}
stores.put(ncId, nodeStores);
@@ -302,8 +315,8 @@
return value == null ? defaultValue : interpreter.parse(value);
} catch (IllegalArgumentException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe("Invalid property value '" + value + "' for property '" + property + "'.\n" +
- "Default = " + defaultValue);
+ LOGGER.severe("Invalid property value '" + value + "' for property '" + property + "'.\n" + "Default = "
+ + defaultValue);
}
throw e;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 35b7d4c..eb73e5e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -187,6 +187,9 @@
public static final int PROVIDER_STREAM_RECORD_READER_WRONG_CONFIGURATION = 3086;
public static final int FEED_CONNECT_FEED_APPLIED_INVALID_FUNCTION = 3087;
+ // Lifecycle management errors
+ public static final int DUPLICATE_PARTITION_ID = 4000;
+
private ErrorCode() {
}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 1f80fad..026f71a 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -174,4 +174,7 @@
3084 = Duplicate record reader format: %1$s
3085 = Unknown Adapter Name.
3086 = Cannot find record reader %1$s with specified configuration.
-3087 = Cannot find function %1$s
\ No newline at end of file
+3087 = Cannot find function %1$s
+
+# Lifecycle management errors
+4000 = Partition id %1$d for node %2$s already in use by node %3$s
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/RetainLogsRule.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/RetainLogsRule.java
index 39bd5e7..2077ad5 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/RetainLogsRule.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/RetainLogsRule.java
@@ -19,6 +19,9 @@
package org.apache.asterix.test.base;
import java.io.File;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.reflect.Method;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
@@ -26,15 +29,17 @@
public class RetainLogsRule extends TestWatcher {
private final File baseDir;
private final File destDir;
+ private final Object instance;
private long startTime;
- public RetainLogsRule(File baseDir, File destDir) {
+ public RetainLogsRule(File baseDir, File destDir, Object instance) {
this.baseDir = baseDir;
this.destDir = destDir;
+ this.instance = instance;
}
- public RetainLogsRule(String baseDir, String destDir) {
- this(new File(baseDir), new File(destDir));
+ public RetainLogsRule(String baseDir, String destDir, Object instance) {
+ this(new File(baseDir), new File(destDir), instance);
}
@Override
@@ -44,7 +49,7 @@
@Override
protected void failed(Throwable e, Description description) {
- File reportDir = new File(destDir, description.getTestClass().getName() + "." + description.getMethodName());
+ File reportDir = new File(destDir, description.getTestClass().getSimpleName() + "." + description.getMethodName());
reportDir.mkdirs();
try {
AsterixTestHelper.deepSelectiveCopy(baseDir, reportDir,
@@ -54,4 +59,23 @@
e1.printStackTrace();
}
}
+
+ @Override
+ protected void finished(Description description) {
+ if (instance != null) {
+ for (Method m : instance.getClass().getMethods()) {
+ if (m.isAnnotationPresent(After.class)) {
+ try {
+ m.invoke(instance);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ }
+
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface After {
+ }
}
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
index 9d0a1db..06e5aed 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AbstractExecutionIT.java
@@ -67,7 +67,7 @@
@Rule
public TestRule retainLogs = new RetainLogsRule(
- AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+ AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
@BeforeClass
public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
index 6e9dd44..c379517 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixExternalLibraryIT.java
@@ -50,7 +50,7 @@
@Rule
public TestRule retainLogs = new RetainLogsRule(
- AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+ AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
@BeforeClass
public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixRestartIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixRestartIT.java
index ce4de7c..d3fdc4a 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixRestartIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixRestartIT.java
@@ -63,7 +63,7 @@
}
@Rule
- public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+ public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
@BeforeClass
public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java
index 7a0a797..7fe156a 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/MetadataReplicationIT.java
@@ -26,11 +26,10 @@
import java.util.logging.Logger;
import org.apache.asterix.event.model.AsterixInstance.State;
-import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.test.base.RetainLogsRule;
+import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.commons.lang3.StringUtils;
-import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -62,7 +61,7 @@
}
@Rule
- public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+ public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
@BeforeClass
public static void setUp() throws Exception {
@@ -89,7 +88,7 @@
LOGGER.info("Instance is in ACTIVE state.");
}
- @After
+ @RetainLogsRule.After
public void after() throws Exception {
LOGGER.info("Destroying instance...");
AsterixInstallerIntegrationUtil.deinit();
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
index 44ce7f8..21f382b 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
@@ -29,7 +29,6 @@
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.hyracks.util.file.FileUtil;
-import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -60,7 +59,7 @@
}
@Rule
- public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath);
+ public TestRule retainLogs = new RetainLogsRule(AsterixInstallerIntegrationUtil.getManagixHome(), reportPath, this);
@BeforeClass
public static void setUp() throws Exception {
@@ -86,7 +85,7 @@
LOGGER.info("Instance is in ACTIVE state.");
}
- @After
+ @RetainLogsRule.After
public void after() throws Exception {
LOGGER.info("Destroying instance...");
AsterixInstallerIntegrationUtil.deinit();
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/DmlRecoveryIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/DmlRecoveryIT.java
index a2968a5..4f04e79 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/DmlRecoveryIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/DmlRecoveryIT.java
@@ -60,7 +60,7 @@
private final TestExecutor testExecutor = new TestExecutor();
@Rule
- public TestRule retainLogs = new RetainLogsRule(managixHomePath, reportPath);
+ public TestRule retainLogs = new RetainLogsRule(managixHomePath, reportPath, this);
@BeforeClass
public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/RecoveryIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/RecoveryIT.java
index a0612cc..0e7ac8b 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/RecoveryIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/transaction/RecoveryIT.java
@@ -58,7 +58,7 @@
private final TestExecutor testExecutor = new TestExecutor();
@Rule
- public TestRule retainLogs = new RetainLogsRule(managixHomePath, reportPath);
+ public TestRule retainLogs = new RetainLogsRule(managixHomePath, reportPath, this);
@BeforeClass
public static void setUp() throws Exception {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 48937f8..64ef5c2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.runtime.utils;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -35,6 +36,8 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
@@ -249,8 +252,8 @@
clusterActiveLocations.add(p.getActiveNodeId());
}
}
- clusterPartitionConstraint =
- new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {}));
+ clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint(
+ clusterActiveLocations.toArray(new String[] {}));
}
public boolean isGlobalRecoveryCompleted() {
@@ -350,4 +353,34 @@
public String getCurrentMetadataNodeId() {
return currentMetadataNode;
}
+
+ @Override
+ public synchronized void registerNodePartitions(String nodeId, ClusterPartition[] nodePartitions)
+ throws AsterixException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Registering node partitions for node " + nodeId + ": " + Arrays.toString(nodePartitions));
+ }
+ // We want to make sure there are no conflicts; make two passes for simplicity...
+ for (ClusterPartition nodePartition : nodePartitions) {
+ if (clusterPartitions.containsKey(nodePartition.getPartitionId())) {
+ throw AsterixException.create(ErrorCode.DUPLICATE_PARTITION_ID, nodePartition.getPartitionId(), nodeId,
+ clusterPartitions.get(nodePartition.getPartitionId()).getNodeId());
+ }
+ }
+ for (ClusterPartition nodePartition : nodePartitions) {
+ clusterPartitions.put(nodePartition.getPartitionId(), nodePartition);
+ }
+ node2PartitionsMap.put(nodeId, nodePartitions);
+ }
+
+ @Override
+ public synchronized void deregisterNodePartitions(String nodeId) {
+ ClusterPartition [] nodePartitions = node2PartitionsMap.remove(nodeId);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Deegistering node partitions for node " + nodeId + ": " + Arrays.toString(nodePartitions));
+ }
+ for (ClusterPartition nodePartition : nodePartitions) {
+ clusterPartitions.remove(nodePartition.getPartitionId());
+ }
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
index b1f0892..64e328a 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
@@ -31,7 +31,9 @@
private final int errorCode;
private final Serializable[] params;
private final String nodeId;
- private transient volatile String msgCache;
+
+ @SuppressWarnings("squid:S1165") // exception class not final
+ private transient CachedMessage msgCache;
public AlgebricksException(String component, int errorCode, String message, Throwable cause, String nodeId,
Serializable... params) {
@@ -42,16 +44,12 @@
this.params = params;
}
- public static AlgebricksException create(int errorCode, Serializable... params) {
- return new AlgebricksException(ErrorCode.HYRACKS, errorCode, ErrorCode.getErrorMessage(errorCode), params);
- }
-
/**
* @deprecated Error code is needed.
*/
@Deprecated
public AlgebricksException(String message) {
- this(ErrorMessageUtil.NONE, UNKNOWN, message, null, null);
+ this(ErrorMessageUtil.NONE, UNKNOWN, message, null, (Serializable[]) null);
}
/**
@@ -59,23 +57,7 @@
*/
@Deprecated
public AlgebricksException(Throwable cause) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, null);
- }
-
- /**
- * @deprecated Error code is needed.
- */
- @Deprecated
- public AlgebricksException(Throwable cause, String nodeId) {
- this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, nodeId);
- }
-
- /**
- * @deprecated Error code is needed.
- */
- @Deprecated
- public AlgebricksException(String message, Throwable cause, String nodeId) {
- this(ErrorMessageUtil.NONE, UNKNOWN, message, cause, nodeId);
+ this(ErrorMessageUtil.NONE, UNKNOWN, cause.getMessage(), cause, (Serializable[]) null);
}
/**
@@ -107,6 +89,10 @@
this(component, errorCode, message, cause, null, params);
}
+ public static AlgebricksException create(int errorCode, Serializable... params) {
+ return new AlgebricksException(ErrorCode.HYRACKS, errorCode, ErrorCode.getErrorMessage(errorCode), params);
+ }
+
public String getComponent() {
return component;
}
@@ -122,10 +108,17 @@
@Override
public String getMessage() {
if (msgCache == null) {
- synchronized (this) {
- msgCache = ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params);
- }
+ msgCache = new CachedMessage(
+ ErrorMessageUtil.formatMessage(component, errorCode, super.getMessage(), params));
}
- return msgCache;
+ return msgCache.message;
+ }
+
+ private static class CachedMessage {
+ private final String message;
+
+ private CachedMessage(String message) {
+ this.message = message;
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ErrorMessageUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ErrorMessageUtil.java
index 390e2b5..467d148 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ErrorMessageUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ErrorMessageUtil.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.api.util;
-import java.io.IOError;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
@@ -30,8 +29,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.exceptions.ErrorCode;
-
public class ErrorMessageUtil {
private static final Logger LOGGER = Logger.getLogger(ErrorMessageUtil.class.getName());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index d6d8bc4..47e78a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -84,19 +84,14 @@
}
// Updates the node registry.
if (nodeRegistry.containsKey(nodeId)) {
- LOGGER.warning("Node with name " + nodeId + " has already registered.");
- return;
+ LOGGER.warning("Node with name " + nodeId + " has already registered; re-registering");
}
nodeRegistry.put(nodeId, ncState);
// Updates the IP address to node names map.
try {
InetAddress ipAddress = getIpAddress(ncState);
- Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
- if (nodes == null) {
- nodes = new HashSet<>();
- ipAddressNodeNameMap.put(ipAddress, nodes);
- }
+ Set<String> nodes = ipAddressNodeNameMap.computeIfAbsent(ipAddress, k -> new HashSet<>());
nodes.add(nodeId);
} catch (HyracksException e) {
// If anything fails, we ignore the node.
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 45763fa..c11deef 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.http.server.utils;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@@ -74,6 +75,10 @@
return request.method() == HttpMethod.POST ? PostRequest.create(request) : BaseRequest.create(request);
}
+ public static String getRequestBody(IServletRequest request) {
+ return request.getHttpRequest().content().toString(StandardCharsets.UTF_8);
+ }
+
public static void setContentType(IServletResponse response, String type, String charset) throws IOException {
response.setHeader(HttpHeaderNames.CONTENT_TYPE, type + "; charset=" + charset);
}