Wait For Metadata Registration Before Active, Etc.
Also:
- metadata.port moved to [app]; honored
- += metadata.node to [app] to optionally specify metadata node
- += metadata.callback.port to [app]
- Decrease timeout for metadata registration from 7 days to default of
one minute, configurable with property
- Log swallowed exception in SynchronizableWork
- Add missing properties (metadata) to cluster state http api
- Make AsterixPropertiesAccessor, to ensure consistent values when
accessed in virtual cluster
Change-Id: I48d7c10b3e43181ec307f7d890ba721f61bc2ab0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1247
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 392bec8..b18b669 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -54,7 +54,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index fc67d3c..e4a57e6 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -65,7 +65,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
ActiveLifecycleListener.INSTANCE.receive(this);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index dc0087b..7750ab0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -64,7 +64,7 @@
public void init(boolean deleteOldInstanceData) throws Exception {
ncs = new NodeControllerService[0]; // ensure that ncs is not null
- propertiesAccessor = new AsterixPropertiesAccessor();
+ propertiesAccessor = AsterixPropertiesAccessor.getInstance();
if (deleteOldInstanceData) {
deleteTransactionLogs();
removeTestStorageFiles();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index ed081b5..c2c214c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -75,7 +75,6 @@
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
@@ -134,26 +133,17 @@
private IReplicationManager replicationManager;
private IRemoteRecoveryManager remoteRecoveryManager;
private IReplicaResourcesManager replicaResourcesManager;
- private final int metadataRmiPort;
private final ILibraryManager libraryManager;
private final NCExtensionManager ncExtensionManager;
- public AsterixNCAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort,
- List<AsterixExtension> extensions) throws AsterixException, InstantiationException, IllegalAccessException,
+ public AsterixNCAppRuntimeContext(INCApplicationContext ncApplicationContext, List<AsterixExtension> extensions)
+ throws AsterixException, InstantiationException, IllegalAccessException,
ClassNotFoundException, IOException {
List<AsterixExtension> allExtensions = new ArrayList<>();
this.ncApplicationContext = ncApplicationContext;
- // Determine whether to use old-style asterix-configuration.xml or new-style configuration.
- // QQQ strip this out eventually
- AsterixPropertiesAccessor propertiesAccessor;
- IApplicationConfig cfg = ncApplicationContext.getAppConfig();
- // QQQ this is NOT a good way to determine whether the config is valid
- if (cfg.getString("cc", "cluster.address") != null) {
- propertiesAccessor = new AsterixPropertiesAccessor(cfg);
- } else {
- propertiesAccessor = new AsterixPropertiesAccessor();
- }
+ AsterixPropertiesAccessor propertiesAccessor =
+ AsterixPropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
externalProperties = new AsterixExternalProperties(propertiesAccessor);
metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
@@ -163,7 +153,6 @@
buildProperties = new AsterixBuildProperties(propertiesAccessor);
replicationProperties = new AsterixReplicationProperties(propertiesAccessor);
messagingProperties = new MessagingProperties(propertiesAccessor);
- this.metadataRmiPort = metadataRmiPort;
libraryManager = new ExternalLibraryManager();
if (extensions != null) {
allExtensions.addAll(extensions);
@@ -458,7 +447,7 @@
// This is a special case, we just give the metadataNode directly.
// This way we can delay the registration of the metadataNode until
// it is completely initialized.
- MetadataManager.instantiate(new MetadataManager(proxy, MetadataNode.INSTANCE));
+ MetadataManager.initialize(proxy, MetadataNode.INSTANCE);
MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse);
MetadataBootstrap.startDDLRecovery();
ncExtensionManager.initializeMetadata();
@@ -470,7 +459,8 @@
@Override
public void exportMetadataNodeStub() throws RemoteException {
- IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
+ IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
+ getMetadataProperties().getMetadataPort());
((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 9120aa5..764b559 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -71,6 +71,7 @@
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
@@ -88,7 +89,8 @@
@Override
public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
- IMessageBroker messageBroker = new CCMessageBroker((ClusterControllerService) ccAppCtx.getControllerService());
+ final ClusterControllerService controllerService = (ClusterControllerService) ccAppCtx.getControllerService();
+ IMessageBroker messageBroker = new CCMessageBroker(controllerService);
this.appCtx = ccAppCtx;
if (LOGGER.isLoggable(Level.INFO)) {
@@ -101,20 +103,21 @@
AsterixResourceIdManager resourceIdManager = new AsterixResourceIdManager();
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.instance(),
- libraryManager, resourceIdManager);
+ libraryManager, resourceIdManager, () -> MetadataManager.INSTANCE);
ccExtensionManager = new CompilerExtensionManager(getExtensions());
AsterixAppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager);
- if (System.getProperty("java.rmi.server.hostname") == null) {
- System.setProperty("java.rmi.server.hostname",
- ((ClusterControllerService) ccAppCtx.getControllerService()).getCCConfig().clusterNetIpAddress);
- }
+ final CCConfig ccConfig = controllerService.getCCConfig();
- setAsterixStateProxy(AsterixStateProxy.registerRemoteObject());
+ if (System.getProperty("java.rmi.server.hostname") == null) {
+ System.setProperty("java.rmi.server.hostname", ccConfig.clusterNetIpAddress);
+ }
+ AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.INSTANCE.getMetadataProperties();
+
+ setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataProperties.getMetadataCallbackPort()));
appCtx.setDistributedState(proxy);
- AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.INSTANCE.getMetadataProperties();
- MetadataManager.instantiate(new MetadataManager(proxy, metadataProperties));
+ MetadataManager.initialize(proxy, metadataProperties);
AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
.addJobLifecycleListener(ActiveLifecycleListener.INSTANCE);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 4eaab2d..75cbe44 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -42,6 +42,7 @@
import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
import org.apache.asterix.runtime.util.ClusterStateManager;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.exceptions.HyracksException;
public class ClusterLifecycleListener implements IClusterLifecycleListener {
@@ -64,13 +65,16 @@
}
@Override
- public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) {
+ public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NC: " + nodeId + " joined");
}
ClusterStateManager.INSTANCE.addNCConfiguration(nodeId, ncConfiguration);
+
//if metadata node rejoining, we need to rebind the proxy connection when it is active again.
- MetadataManager.INSTANCE.rebindMetadataNode = !ClusterStateManager.INSTANCE.isMetadataNodeActive();
+ if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) {
+ MetadataManager.INSTANCE.rebindMetadataNode();
+ }
Set<String> nodeAddition = new HashSet<String>();
nodeAddition.add(nodeId);
@@ -88,7 +92,7 @@
}
@Override
- public void notifyNodeFailure(Set<String> deadNodeIds) {
+ public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException {
for (String deadNode : deadNodeIds) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NC: " + deadNode + " left");
@@ -96,7 +100,9 @@
ClusterStateManager.INSTANCE.removeNCConfiguration(deadNode);
//if metadata node failed, we need to rebind the proxy connection when it is active again
- MetadataManager.INSTANCE.rebindMetadataNode = !ClusterStateManager.INSTANCE.isMetadataNodeActive();
+ if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) {
+ MetadataManager.INSTANCE.rebindMetadataNode();
+ }
}
updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index f26afa8..e6f3142 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -61,10 +61,6 @@
public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName());
- @Option(name = "-metadata-port", usage = "IP port to bind metadata listener (default: random port)",
- required = false)
- public int metadataRmiPort = 0;
-
@Option(name = "-initial-run",
usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false)
public boolean initialRun = false;
@@ -94,7 +90,6 @@
parser.printUsage(System.err);
throw e;
}
-
ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getThreadFactory(),
ncAppCtx.getLifeCycleComponentManager()));
ncApplicationContext = ncAppCtx;
@@ -103,11 +98,13 @@
LOGGER.info("Starting Asterix node controller: " + nodeId);
}
+ final NodeControllerService controllerService = (NodeControllerService) ncAppCtx.getControllerService();
+
if (System.getProperty("java.rmi.server.hostname") == null) {
- System.setProperty("java.rmi.server.hostname", ((NodeControllerService) ncAppCtx.getControllerService())
+ System.setProperty("java.rmi.server.hostname", (controllerService)
.getConfiguration().clusterNetPublicIPAddress);
}
- runtimeContext = new AsterixNCAppRuntimeContext(ncApplicationContext, metadataRmiPort, getExtensions());
+ runtimeContext = new AsterixNCAppRuntimeContext(ncApplicationContext, getExtensions());
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
@@ -120,8 +117,7 @@
ncApplicationContext.setApplicationObject(runtimeContext);
MessagingProperties messagingProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getMessagingProperties();
- messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService(),
- messagingProperties);
+ messageBroker = new NCMessageBroker(controllerService, messagingProperties);
ncApplicationContext.setMessageBroker(messageBroker);
MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
(NCMessageBroker) messageBroker, messagingProperties);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
index ea002ab..d076f74 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm
@@ -7,20 +7,53 @@
"config": {
"api.port": 19002,
"cc.java.opts": "-Xmx1024m",
+ "cluster.partitions": {
+ "0": "ID:0, Original Node: asterix_nc1, IODevice: 0, Active Node: asterix_nc1",
+ "1": "ID:1, Original Node: asterix_nc1, IODevice: 1, Active Node: asterix_nc1",
+ "2": "ID:2, Original Node: asterix_nc2, IODevice: 0, Active Node: asterix_nc2",
+ "3": "ID:3, Original Node: asterix_nc2, IODevice: 1, Active Node: asterix_nc2"
+ },
"compiler.framesize": 32768,
"compiler.groupmemory": 163840,
"compiler.joinmemory": 163840,
"compiler.pregelix.home": "~/pregelix",
"compiler.sortmemory": 327680,
+ "core.dump.paths": {},
"feed.central.manager.port": 4500,
"feed.max.threshold.period": 5,
"feed.memory.available.wait.timeout": 10,
"feed.memory.global.budget": 67108864,
"feed.pending.work.threshold": 50,
"feed.port": 19003,
+ "instance.name": null,
"log.level": "INFO",
"max.wait.active.cluster": 60,
+ "metadata.callback.port": 0,
+ "metadata.node": "asterix_nc1",
+ "metadata.partition": "ID:0, Original Node: asterix_nc1, IODevice: 0, Active Node: asterix_nc1",
+ "metadata.port": 0,
+ "metadata.registration.timeout.secs": 60,
"nc.java.opts": "-Xmx1024m",
+ "node.partitions": {
+ "asterix_nc1": [
+ "ID:0, Original Node: asterix_nc1, IODevice: 0, Active Node: asterix_nc1",
+ "ID:1, Original Node: asterix_nc1, IODevice: 1, Active Node: asterix_nc1"
+ ],
+ "asterix_nc2": [
+ "ID:2, Original Node: asterix_nc2, IODevice: 0, Active Node: asterix_nc2",
+ "ID:3, Original Node: asterix_nc2, IODevice: 1, Active Node: asterix_nc2"
+ ]
+ },
+ "node.stores": {
+ "asterix_nc1": [
+ "iodevice0",
+ "iodevice1"
+ ],
+ "asterix_nc2": [
+ "iodevice0",
+ "iodevice1"
+ ]
+ },
"plot.activate": false,
"replication.enabled": false,
"replication.factor": 2,
@@ -38,6 +71,10 @@
"storage.memorycomponent.numpages": 8,
"storage.memorycomponent.pagesize": 131072,
"storage.metadata.memorycomponent.numpages": 256,
+ "transaction.log.dirs": {
+ "asterix_nc1": "target/txnLogDir/asterix_nc1",
+ "asterix_nc2": "target/txnLogDir/asterix_nc2"
+ },
"txn.commitprofiler.reportinterval": 5,
"txn.job.recovery.memorysize": 67108864,
"txn.lock.escalationthreshold": 1000,
@@ -95,4 +132,4 @@
"shutdownUri": "http://127.0.0.1:19002/admin/shutdown",
"state": "ACTIVE",
"versionUri": "http://127.0.0.1:19002/admin/version"
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index adf8e38..323df65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -27,6 +27,7 @@
public enum ClusterState {
STARTING,
+ PENDING,
ACTIVE,
UNUSABLE,
REBALANCING
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index 6cd44a7..cc27fbb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -57,17 +57,16 @@
@Override
public ClusterPartition clone() {
- ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum);
- return clone;
+ return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("ID:" + partitionId);
- sb.append(" Original Node: " + nodeId);
- sb.append(" IODevice: " + ioDeviceNum);
- sb.append(" Active Node: " + activeNodeId);
+ sb.append(", Original Node: " + nodeId);
+ sb.append(", IODevice: " + ioDeviceNum);
+ sb.append(", Active Node: " + activeNodeId);
return sb.toString();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 677fc78..3584f2b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -26,22 +26,35 @@
public class AsterixMetadataProperties extends AbstractAsterixProperties {
+ private static final String METADATA_REGISTRATION_TIMEOUT_KEY = "metadata.registration.timeout.secs";
+ private static final long METADATA_REGISTRATION_TIMEOUT_DEFAULT = 60;
+
+ private static final String METADATA_PORT_KEY = "metadata.port";
+ private static final int METADATA_PORT_DEFAULT = 0;
+
+ private static final String METADATA_CALLBACK_PORT_KEY = "metadata.callback.port";
+ private static final int METADATA_CALLBACK_PORT_DEFAULT = 0;
+
public AsterixMetadataProperties(AsterixPropertiesAccessor accessor) {
super(accessor);
}
+ @PropertyKey("instance.name")
public String getInstanceName() {
return accessor.getInstanceName();
}
+ @PropertyKey("metadata.node")
public String getMetadataNodeName() {
return accessor.getMetadataNodeName();
}
+ @PropertyKey("metadata.partition")
public ClusterPartition getMetadataPartition() {
return accessor.getMetadataPartition();
}
+ @PropertyKey("node.stores")
public Map<String, String[]> getStores() {
return accessor.getStores();
}
@@ -54,19 +67,41 @@
return accessor.getCoredumpPath(nodeId);
}
+ @PropertyKey("core.dump.paths")
public Map<String, String> getCoredumpPaths() {
return accessor.getCoredumpConfig();
}
+ @PropertyKey("node.partitions")
public Map<String, ClusterPartition[]> getNodePartitions() {
return accessor.getNodePartitions();
}
+ @PropertyKey("cluster.partitions")
public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
return accessor.getClusterPartitions();
}
+ @PropertyKey("transaction.log.dirs")
public Map<String, String> getTransactionLogDirs() {
return accessor.getTransactionLogDirs();
}
+
+ @PropertyKey(METADATA_REGISTRATION_TIMEOUT_KEY)
+ public long getRegistrationTimeoutSecs() {
+ return accessor.getProperty(METADATA_REGISTRATION_TIMEOUT_KEY, METADATA_REGISTRATION_TIMEOUT_DEFAULT,
+ PropertyInterpreters.getLongPropertyInterpreter());
+ }
+
+ @PropertyKey(METADATA_PORT_KEY)
+ public int getMetadataPort() {
+ return accessor.getProperty(METADATA_PORT_KEY, METADATA_PORT_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
+ @PropertyKey(METADATA_CALLBACK_PORT_KEY)
+ public int getMetadataCallbackPort() {
+ return accessor.getProperty(METADATA_CALLBACK_PORT_KEY, METADATA_CALLBACK_PORT_DEFAULT,
+ PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java
index 1576774..3ae2bd9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java
@@ -34,7 +34,7 @@
public static final String PROPERTY_CLUSTER_ADDRESS = "cluster.address";
public static final String PROPERTY_INSTANCE_NAME = "instance";
public static final String DEFAULT_INSTANCE_NAME = "DEFAULT_INSTANCE";
- public static final String PROPERTY_METADATA_PORT = "metadata.port";
+ public static final String PROPERTY_METADATA_NODE = "metadata.node";
public static final String PROPERTY_COREDUMP_DIR = "coredumpdir";
public static final String DEFAULT_COREDUMP_DIR = String.join(File.separator, ASTERIXDB, "coredump");
public static final String PROPERTY_TXN_LOG_DIR = "txnlogdir";
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 61cb618..a12d802 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -31,6 +31,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
@@ -54,6 +55,7 @@
public class AsterixPropertiesAccessor {
private static final Logger LOGGER = Logger.getLogger(AsterixPropertiesAccessor.class.getName());
+ private static final AtomicReference<AsterixPropertiesAccessor> instanceHolder = new AtomicReference<>();
private final String instanceName;
private final String metadataNodeName;
private final List<String> nodeNames = new ArrayList<>();;
@@ -76,7 +78,7 @@
* @throws AsterixException
* @throws IOException
*/
- public AsterixPropertiesAccessor() throws AsterixException, IOException {
+ private AsterixPropertiesAccessor() throws AsterixException, IOException {
String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
if (fileName == null) {
fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME;
@@ -164,25 +166,24 @@
/**
* Constructor which wraps an IApplicationConfig.
*/
- public AsterixPropertiesAccessor(IApplicationConfig cfg) throws AsterixException {
+ private AsterixPropertiesAccessor(IApplicationConfig cfg) throws AsterixException {
this.cfg = cfg;
instanceName = cfg.getString(AsterixProperties.SECTION_ASTERIX, AsterixProperties.PROPERTY_INSTANCE_NAME,
AsterixProperties.DEFAULT_INSTANCE_NAME);
- String mdNode = null;
nodePartitionsMap = new HashMap<>();
MutableInt uniquePartitionId = new MutableInt(0);
extensions = new ArrayList<>();
// Iterate through each configured NC.
for (String section : cfg.getSections()) {
if (section.startsWith(AsterixProperties.SECTION_PREFIX_NC)) {
- mdNode = configureNc(section, mdNode, uniquePartitionId);
+ configureNc(section, uniquePartitionId);
} else if (section.startsWith(AsterixProperties.SECTION_PREFIX_EXTENSION)) {
String className = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_EXTENSION, section);
configureExtension(className, section);
}
}
-
- metadataNodeName = mdNode;
+ metadataNodeName = getProperty(AsterixProperties.PROPERTY_METADATA_NODE,
+ nodeNames.isEmpty() ? "" : nodeNames.get(0), PropertyInterpreters.getStringPropertyInterpreter());
asterixConfigurationParams = null;
loadAsterixBuildProperties();
}
@@ -197,16 +198,8 @@
extensions.add(new AsterixExtension(className, kvs));
}
- private String configureNc(String section, String mdNode, MutableInt uniquePartitionId) {
+ private void configureNc(String section, MutableInt uniquePartitionId) {
String ncId = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_NC, section);
- String newMetadataNode = mdNode;
-
- // Here we figure out which is the metadata node. If any NCs
- // declare "metadata.port", use that one; otherwise just use the first.
- if (mdNode == null || cfg.getString(section, AsterixProperties.PROPERTY_METADATA_PORT) != null) {
- // QQQ But we don't actually *honor* metadata.port yet!
- newMetadataNode = ncId;
- }
// Now we assign the coredump and txnlog directories for this node.
// QQQ Default values? Should they be specified here? Or should there
@@ -225,7 +218,7 @@
String[] nodeStores = new String[iodevices.length];
ClusterPartition[] nodePartitions = new ClusterPartition[iodevices.length];
for (int i = 0; i < nodePartitions.length; i++) {
- // Construct final storage path from iodevice dir + storage subdir.
+ // Construct final storage path from iodevice dir + storage subdir.s
nodeStores[i] = iodevices[i] + File.separator + storageSubdir;
// Create ClusterPartition instances for this NC.
ClusterPartition partition = new ClusterPartition(uniquePartitionId.getValue(), ncId, i);
@@ -236,7 +229,6 @@
stores.put(ncId, nodeStores);
nodePartitionsMap.put(ncId, nodePartitions);
nodeNames.add(ncId);
- return newMetadataNode;
}
private void loadAsterixBuildProperties() throws AsterixException {
@@ -334,4 +326,24 @@
public List<AsterixExtension> getExtensions() {
return extensions;
}
+
+ public static AsterixPropertiesAccessor getInstance(IApplicationConfig cfg) throws IOException, AsterixException {
+ // Determine whether to use old-style asterix-configuration.xml or new-style configuration.
+ // QQQ strip this out eventually
+ // QQQ this is NOT a good way to determine whether the config is valid
+ AsterixPropertiesAccessor propertiesAccessor;
+ if (cfg != null && cfg.getString("cc", "cluster.address") != null) {
+ propertiesAccessor = new AsterixPropertiesAccessor(cfg);
+ } else {
+ propertiesAccessor = new AsterixPropertiesAccessor();
+ }
+ if (!instanceHolder.compareAndSet(null, propertiesAccessor)) {
+ propertiesAccessor = instanceHolder.get();
+ }
+ return propertiesAccessor;
+ }
+
+ public static AsterixPropertiesAccessor getInstance() throws IOException, AsterixException {
+ return getInstance(null);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index dbd2139..6e8c4cf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -28,5 +28,5 @@
/**
* handle the message upon delivery
*/
- public void handle(IControllerService cs) throws HyracksDataException, InterruptedException;
+ void handle(IControllerService cs) throws HyracksDataException, InterruptedException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataBootstrap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataBootstrap.java
new file mode 100644
index 0000000..940ec60
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataBootstrap.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.metadata;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface IMetadataBootstrap {
+ /**
+ * Initializes the metadata manager, e.g., finds the remote metadata node.
+ */
+ void init() throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
index d1efe11..8a3392a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
@@ -18,26 +18,47 @@
*/
package org.apache.asterix.metadata;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
/**
* Periodically recycle temporary datasets.
*
* @author yingyib
*/
public class GarbageCollector implements Runnable {
+ private static final Logger LOGGER = Logger.getLogger(GarbageCollector.class.getName());
- private static long CLEANUP_PERIOD = 3600 * 24;
+ private static final long CLEANUP_PERIOD = 3600L * 24;
- @Override
- public void run() {
- try {
- synchronized (this) {
- this.wait(CLEANUP_PERIOD);
- }
- MetadataManager.INSTANCE.cleanupTempDatasets();
- } catch (Exception e) {
- // Prints the stack trace to log.
- e.printStackTrace();
- }
+ static {
+ // Starts the garbage collector thread which
+ // should always be running.
+ Thread gcThread = new Thread(new GarbageCollector(), "Metadata GC");
+ gcThread.setDaemon(true);
+ gcThread.start();
}
+ @Override
+ @SuppressWarnings("squid:S2142") // rethrow or interrupt thread on InterruptedException
+ public void run() {
+ LOGGER.info("Starting Metadata GC");
+ while (true) {
+ try {
+ synchronized (this) {
+ this.wait(CLEANUP_PERIOD);
+ }
+ MetadataManager.INSTANCE.cleanupTempDatasets();
+ } catch (InterruptedException e) {
+ break;
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception cleaning temp datasets", e);
+ }
+ }
+ LOGGER.info("Exiting Metadata GC");
+ }
+
+ public static void ensure() {
+ // no need to do anything, <clinit> does the work
+ }
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 70da097..6a324a1 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -22,6 +22,7 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -49,6 +50,7 @@
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* Provides access to Asterix metadata via remote methods to the metadata node.
@@ -83,71 +85,34 @@
* with transaction ids of regular jobs or other metadata transactions.
*/
public class MetadataManager implements IMetadataManager {
- private static final int INITIAL_SLEEP_TIME = 64;
- private static final int RETRY_MULTIPLIER = 5;
- private static final int MAX_RETRY_COUNT = 10;
-
- // Set in init().
- public static MetadataManager INSTANCE;
private final MetadataCache cache = new MetadataCache();
- private final IAsterixStateProxy proxy;
- private IMetadataNode metadataNode;
+ protected final IAsterixStateProxy proxy;
+ protected IMetadataNode metadataNode;
private final ReadWriteLock metadataLatch;
- private final AsterixMetadataProperties metadataProperties;
- public boolean rebindMetadataNode = false;
+ protected boolean rebindMetadataNode = false;
- public MetadataManager(IAsterixStateProxy proxy, AsterixMetadataProperties metadataProperties) {
- if (proxy == null) {
- throw new Error("Null proxy given to MetadataManager.");
+ // TODO(mblow): replace references of this (non-constant) field with a method, update field name accordingly
+ public static IMetadataManager INSTANCE;
+
+ private MetadataManager(IAsterixStateProxy proxy, IMetadataNode metadataNode) {
+ this(proxy);
+ if (metadataNode == null) {
+ throw new IllegalArgumentException("Null metadataNode given to MetadataManager");
}
- this.proxy = proxy;
- this.metadataProperties = metadataProperties;
- this.metadataNode = null;
- this.metadataLatch = new ReentrantReadWriteLock(true);
+ this.metadataNode = metadataNode;
}
- public MetadataManager(IAsterixStateProxy proxy, IMetadataNode metadataNode) {
- if (metadataNode == null) {
- throw new Error("Null metadataNode given to MetadataManager.");
+ private MetadataManager(IAsterixStateProxy proxy) {
+ if (proxy == null) {
+ throw new IllegalArgumentException("Null proxy given to MetadataManager");
}
this.proxy = proxy;
- this.metadataProperties = null;
- this.metadataNode = metadataNode;
this.metadataLatch = new ReentrantReadWriteLock(true);
}
@Override
- public void init() throws RemoteException, MetadataException {
- // Could be synchronized on any object. Arbitrarily chose proxy.
- synchronized (proxy) {
- if (metadataNode != null && !rebindMetadataNode) {
- return;
- }
- try {
- int retry = 0;
- int sleep = INITIAL_SLEEP_TIME;
- while (retry++ < MAX_RETRY_COUNT) {
- metadataNode = proxy.getMetadataNode();
- if (metadataNode != null) {
- rebindMetadataNode = false;
- break;
- }
- Thread.sleep(sleep);
- sleep *= RETRY_MULTIPLIER;
- }
- } catch (InterruptedException e) {
- throw new MetadataException(e);
- }
- if (metadataNode == null) {
- throw new Error("Failed to get the MetadataNode.\n" + "The MetadataNode was configured to run on NC: "
- + metadataProperties.getMetadataNodeName());
- }
- }
-
- // Starts the garbage collector thread which
- // should always be running.
- Thread garbageCollectorThread = new Thread(new GarbageCollector());
- garbageCollectorThread.start();
+ public void init() throws HyracksDataException {
+ GarbageCollector.ensure();
}
@Override
@@ -243,7 +208,7 @@
@Override
public List<Dataset> getDataverseDatasets(MetadataTransactionContext ctx, String dataverseName)
throws MetadataException {
- List<Dataset> dataverseDatasets = new ArrayList<Dataset>();
+ List<Dataset> dataverseDatasets = new ArrayList<>();
// add uncommitted temporary datasets
for (Dataset dataset : ctx.getDataverseDatasets(dataverseName)) {
if (dataset.getDatasetDetails().isTemp()) {
@@ -339,7 +304,7 @@
@Override
public List<Index> getDatasetIndexes(MetadataTransactionContext ctx, String dataverseName, String datasetName)
throws MetadataException {
- List<Index> datasetIndexes = new ArrayList<Index>();
+ List<Index> datasetIndexes = new ArrayList<>();
Dataset dataset = findDataset(ctx, dataverseName, datasetName);
if (dataset == null) {
return datasetIndexes;
@@ -373,7 +338,7 @@
public CompactionPolicy getCompactionPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
throws MetadataException {
- CompactionPolicy compactionPolicy = null;
+ CompactionPolicy compactionPolicy;
try {
compactionPolicy = metadataNode.getCompactionPolicy(ctx.getJobId(), dataverse, policyName);
} catch (RemoteException e) {
@@ -434,7 +399,7 @@
ARecordType aRecType = (ARecordType) datatype.getDatatype();
return new Datatype(
datatype.getDataverseName(), datatype.getDatatypeName(), new ARecordType(aRecType.getTypeName(),
- aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()),
+ aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()),
datatype.getIsAnonymous());
}
try {
@@ -710,7 +675,7 @@
@Override
public DatasourceAdapter getAdapter(MetadataTransactionContext ctx, String dataverseName, String name)
throws MetadataException {
- DatasourceAdapter adapter = null;
+ DatasourceAdapter adapter;
try {
adapter = metadataNode.getAdapter(ctx.getJobId(), dataverseName, name);
} catch (RemoteException e) {
@@ -733,7 +698,7 @@
@Override
public List<Library> getDataverseLibraries(MetadataTransactionContext ctx, String dataverseName)
throws MetadataException {
- List<Library> dataverseLibaries = null;
+ List<Library> dataverseLibaries;
try {
// Assuming that the transaction can read its own writes on the
// metadata node.
@@ -759,7 +724,7 @@
@Override
public Library getLibrary(MetadataTransactionContext ctx, String dataverseName, String libraryName)
throws MetadataException, RemoteException {
- Library library = null;
+ Library library;
try {
library = metadataNode.getLibrary(ctx.getJobId(), dataverseName, libraryName);
} catch (RemoteException e) {
@@ -792,18 +757,18 @@
public FeedPolicyEntity getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
throws MetadataException {
- FeedPolicyEntity FeedPolicy = null;
+ FeedPolicyEntity feedPolicy;
try {
- FeedPolicy = metadataNode.getFeedPolicy(ctx.getJobId(), dataverse, policyName);
+ feedPolicy = metadataNode.getFeedPolicy(ctx.getJobId(), dataverse, policyName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
- return FeedPolicy;
+ return feedPolicy;
}
@Override
public Feed getFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException {
- Feed feed = null;
+ Feed feed;
try {
feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName);
} catch (RemoteException e) {
@@ -814,7 +779,7 @@
@Override
public void dropFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException {
- Feed feed = null;
+ Feed feed;
try {
feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName);
metadataNode.dropFeed(ctx.getJobId(), dataverse, feedName);
@@ -834,6 +799,7 @@
ctx.addFeed(feed);
}
+ @Override
public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String dataverse)
throws MetadataException {
List<DatasourceAdapter> dataverseAdapters;
@@ -845,9 +811,10 @@
return dataverseAdapters;
}
+ @Override
public void dropFeedPolicy(MetadataTransactionContext mdTxnCtx, String dataverseName, String policyName)
throws MetadataException {
- FeedPolicyEntity feedPolicy = null;
+ FeedPolicyEntity feedPolicy;
try {
feedPolicy = metadataNode.getFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName);
metadataNode.dropFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName);
@@ -901,7 +868,7 @@
@Override
public ExternalFile getExternalFile(MetadataTransactionContext ctx, String dataverseName, String datasetName,
- Integer fileNumber) throws MetadataException {
+ Integer fileNumber) throws MetadataException {
ExternalFile file;
try {
file = metadataNode.getExternalFile(ctx.getJobId(), dataverseName, datasetName, fileNumber);
@@ -939,7 +906,7 @@
cache.cleanupTempDatasets();
}
- private Dataset findDataset(MetadataTransactionContext ctx, String dataverseName, String datasetName) {
+ public Dataset findDataset(MetadataTransactionContext ctx, String dataverseName, String datasetName) {
Dataset dataset = ctx.getDataset(dataverseName, datasetName);
if (dataset == null) {
dataset = cache.getDataset(dataverseName, datasetName);
@@ -969,7 +936,8 @@
@Override
public <T extends IExtensionMetadataEntity> List<T> getEntities(MetadataTransactionContext mdTxnCtx,
- IExtensionMetadataSearchKey searchKey) throws MetadataException {
+ IExtensionMetadataSearchKey searchKey)
+ throws MetadataException {
try {
return metadataNode.getEntities(mdTxnCtx.getJobId(), searchKey);
} catch (RemoteException e) {
@@ -977,7 +945,49 @@
}
}
- public static synchronized void instantiate(MetadataManager metadataManager) {
- MetadataManager.INSTANCE = metadataManager;
+ @Override
+ public void rebindMetadataNode() {
+ rebindMetadataNode = true;
+ }
+
+ public static void initialize(IAsterixStateProxy proxy, AsterixMetadataProperties metadataProperties) {
+ INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties);
+ }
+
+ public static void initialize(IAsterixStateProxy proxy, MetadataNode metadataNode) {
+ INSTANCE = new MetadataManager(proxy, metadataNode);
+ }
+
+ private static class CCMetadataManagerImpl extends MetadataManager {
+ private final AsterixMetadataProperties metadataProperties;
+
+ public CCMetadataManagerImpl(IAsterixStateProxy proxy, AsterixMetadataProperties metadataProperties) {
+ super(proxy);
+ this.metadataProperties = metadataProperties;
+ }
+
+ @Override
+ public synchronized void init() throws HyracksDataException {
+ if (metadataNode != null && !rebindMetadataNode) {
+ return;
+ }
+ try {
+ metadataNode = proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(),
+ TimeUnit.SECONDS);
+ if (metadataNode != null) {
+ rebindMetadataNode = false;
+ } else {
+ throw new HyracksDataException("The MetadataNode failed to bind before the configured timeout ("
+ + metadataProperties.getRegistrationTimeoutSecs() + " seconds); the MetadataNode was " +
+ "configured to run on NC: " + metadataProperties.getMetadataNodeName());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new HyracksDataException(e);
+ } catch (RemoteException e) {
+ throw new HyracksDataException(e);
+ }
+ super.init();
+ }
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
index 7717a79..c94e159 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java
@@ -22,12 +22,13 @@
import java.io.Serializable;
import java.rmi.Remote;
import java.rmi.RemoteException;
+import java.util.concurrent.TimeUnit;
/**
* Interface for setting/getting distributed state of Asterix.
*/
public interface IAsterixStateProxy extends Remote, Serializable {
- public void setMetadataNode(IMetadataNode metadataNode) throws RemoteException;
+ void setMetadataNode(IMetadataNode metadataNode) throws RemoteException;
- public IMetadataNode getMetadataNode() throws RemoteException;
+ IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) throws RemoteException, InterruptedException;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index 0acc027..feb4db0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -39,7 +40,6 @@
import org.apache.asterix.metadata.entities.Library;
import org.apache.asterix.metadata.entities.Node;
import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
* A metadata manager provides user access to Asterix metadata (e.g., types,
@@ -53,16 +53,7 @@
* finer levels is the responsibility of the metadata node, not the metadata
* manager or its user.
*/
-public interface IMetadataManager {
-
- /**
- * Initializes the metadata manager, e.g., finds the remote metadata node.
- *
- * @throws RemoteException
- * If an error occurred while contacting the proxy for finding
- * the metadata node.
- */
- void init() throws RemoteException, MetadataException;
+public interface IMetadataManager extends IMetadataBootstrap {
/**
* Begins a transaction on the metadata node.
@@ -256,7 +247,8 @@
* Name of the datavers holding the given dataset.
* @param datasetName
* Name of the dataset holding the index.
- * @indexName Name of the index to retrieve.
+ * @param indexName
+ * Name of the index to retrieve.
* @return An Index instance.
* @throws MetadataException
* For example, if the index does not exist.
@@ -273,7 +265,8 @@
* Name of the datavers holding the given dataset.
* @param datasetName
* Name of the dataset holding the index.
- * @indexName Name of the index to retrieve.
+ * @param indexName
+ * Name of the index to retrieve.
* @throws MetadataException
* For example, if the index does not exist.
*/
@@ -406,7 +399,7 @@
/**
* @param mdTxnCtx
* MetadataTransactionContext of an active metadata transaction.
- * @param function
+ * @param adapter
* An instance of type Adapter that represents the adapter being
* added
* @throws MetadataException
@@ -418,7 +411,7 @@
* MetadataTransactionContext of an active metadata transaction.
* @param dataverseName
* the dataverse associated with the adapter being searched
- * @param Name
+ * @param name
* name of the adapter
* @return
* @throws MetadataException
@@ -438,6 +431,18 @@
void dropAdapter(MetadataTransactionContext ctx, String dataverseName, String name) throws MetadataException;
/**
+ *
+ * @param ctx
+ * MetadataTransactionContext of an active metadata transaction.
+ * @param dataverseName
+ * the dataverse whose associated adapters are being requested
+ * @return
+ * @throws MetadataException
+ */
+ List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext ctx, String dataverseName)
+ throws MetadataException;
+
+ /**
* @param ctx
* @param policy
* @throws MetadataException
@@ -497,6 +502,14 @@
* @param ctx
* @param dataverse
* @param policyName
+ * @throws MetadataException
+ */
+ void dropFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName) throws MetadataException;
+
+ /**
+ * @param ctx
+ * @param dataverse
+ * @param policyName
* @return
* @throws MetadataException
*/
@@ -526,7 +539,7 @@
* @param libraryName
* Name of library to be deleted. MetadataException for example,
* if the library does not exists.
- * @throws RemoteException
+ * @throws MetadataException
*/
void dropLibrary(MetadataTransactionContext ctx, String dataverseName, String libraryName) throws MetadataException;
@@ -540,7 +553,6 @@
* Library to be added
* @throws MetadataException
* for example, if the library is already added.
- * @throws RemoteException
*/
void addLibrary(MetadataTransactionContext ctx, Library library) throws MetadataException;
@@ -567,7 +579,6 @@
* dataverse asociated with the library that is to be retrieved.
* @return Library
* @throws MetadataException
- * @throws RemoteException
*/
List<Library> getDataverseLibraries(MetadataTransactionContext ctx, String dataverseName) throws MetadataException;
@@ -671,9 +682,13 @@
* @param searchKey
* @return
* @throws MetadataException
- * @throws HyracksDataException
*/
<T extends IExtensionMetadataEntity> List<T> getEntities(MetadataTransactionContext mdTxnCtx,
- IExtensionMetadataSearchKey searchKey) throws MetadataException, HyracksDataException;
+ IExtensionMetadataSearchKey searchKey) throws MetadataException;
+ /**
+ * Indicate when the metadata node has left or rejoined the cluster, and the MetadataManager should
+ * rebind it
+ */
+ void rebindMetadataNode();
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 2f881be..da6bb54 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -21,6 +21,7 @@
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.asterix.metadata.api.IAsterixStateProxy;
@@ -36,8 +37,8 @@
private IMetadataNode metadataNode;
private static final IAsterixStateProxy cc = new AsterixStateProxy();
- public static IAsterixStateProxy registerRemoteObject() throws RemoteException {
- IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, 0);
+ public static IAsterixStateProxy registerRemoteObject(int metadataCallbackPort) throws RemoteException {
+ IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort);
LOGGER.info("Asterix Distributed State Proxy Bound");
return stub;
}
@@ -48,12 +49,21 @@
}
@Override
- public void setMetadataNode(IMetadataNode metadataNode) throws RemoteException {
+ public synchronized void setMetadataNode(IMetadataNode metadataNode) {
this.metadataNode = metadataNode;
+ notifyAll();
}
@Override
- public IMetadataNode getMetadataNode() throws RemoteException {
- return this.metadataNode;
+ public IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) throws InterruptedException {
+ synchronized (this) {
+ long timeToWait = TimeUnit.MILLISECONDS.convert(waitFor, timeUnit);
+ while (metadataNode == null && timeToWait > 0) {
+ long startTime = System.currentTimeMillis();
+ wait(timeToWait);
+ timeToWait -= System.currentTimeMillis() - startTime;
+ }
+ return metadataNode;
+ }
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
index 5cb1a6a..dba3bc7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java
@@ -63,7 +63,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
index 4ae73ea..cb56c39 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java
@@ -48,7 +48,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
ClusterStateManager.INSTANCE.processCompleteFailbackResponse(this);
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
index c112366..7283b89 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java
@@ -72,7 +72,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
index db89f7c..d87cd23 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java
@@ -39,7 +39,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
ClusterStateManager.INSTANCE.processPreparePartitionsFailbackResponse(this);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
index fc55968..7776543 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java
@@ -54,7 +54,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index f9f6233..c1319e0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -48,7 +48,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
IAsterixResourceIdManager resourceIdManager =
AsterixAppContextInfo.INSTANCE.getResourceIdManager();
resourceIdManager.report(src, maxResourceId);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
index 203104e..a1290df 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java
@@ -27,7 +27,7 @@
private static final long serialVersionUID = 1L;
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
ReportMaxResourceIdMessage.send((NodeControllerService) cs);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index afe2427..a2c8d74 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -38,7 +38,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
try {
ICCMessageBroker broker =
(ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
index e877f52..7264c88 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java
@@ -35,7 +35,7 @@
private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
index 2466e2b..d3c3502 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java
@@ -37,7 +37,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
ClusterStateManager.INSTANCE.processMetadataNodeTakeoverResponse(this);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
index e024eed..e78f159 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java
@@ -74,7 +74,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
NodeControllerService ncs = (NodeControllerService) cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
index a4a5226..3adc8e9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java
@@ -49,7 +49,7 @@
}
@Override
- public void handle(IControllerService cs) throws HyracksDataException {
+ public void handle(IControllerService cs) throws HyracksDataException, InterruptedException {
ClusterStateManager.INSTANCE.processPartitionTakeoverResponse(this);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
index 471d3d3..592f9df 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java
@@ -19,6 +19,7 @@
package org.apache.asterix.runtime.util;
import java.io.IOException;
+import java.util.function.Supplier;
import java.util.logging.Logger;
import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
@@ -37,8 +38,8 @@
import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.transactions.IAsterixResourceIdManager;
-import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -66,6 +67,7 @@
private AsterixReplicationProperties replicationProperties;
private AsterixExtensionProperties extensionProperties;
private MessagingProperties messagingProperties;
+ private Supplier<IMetadataBootstrap> metadataBootstrapSupplier;
private IHyracksClientConnection hcc;
private Object extensionManager;
private volatile boolean initialized = false;
@@ -74,8 +76,10 @@
}
public static synchronized void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
- IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager,
- IAsterixResourceIdManager resourceIdManager)
+ IGlobalRecoveryMaanger globalRecoveryMaanger,
+ ILibraryManager libraryManager,
+ IAsterixResourceIdManager resourceIdManager,
+ Supplier<IMetadataBootstrap> metadataBootstrapSupplier)
throws AsterixException, IOException {
if (INSTANCE.initialized) {
throw new AsterixException(AsterixAppContextInfo.class.getSimpleName() + " has been initialized already");
@@ -88,14 +92,7 @@
INSTANCE.resourceIdManager = resourceIdManager;
// Determine whether to use old-style asterix-configuration.xml or new-style configuration.
// QQQ strip this out eventually
- AsterixPropertiesAccessor propertiesAccessor;
- IApplicationConfig cfg = ccAppCtx.getAppConfig();
- // QQQ this is NOT a good way to determine whether the config is valid
- if (cfg.getString("cc", "cluster.address") != null) {
- propertiesAccessor = new AsterixPropertiesAccessor(cfg);
- } else {
- propertiesAccessor = new AsterixPropertiesAccessor();
- }
+ AsterixPropertiesAccessor propertiesAccessor = AsterixPropertiesAccessor.getInstance(ccAppCtx.getAppConfig());
INSTANCE.compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
INSTANCE.externalProperties = new AsterixExternalProperties(propertiesAccessor);
INSTANCE.metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
@@ -107,6 +104,8 @@
INSTANCE.hcc = hcc;
INSTANCE.buildProperties = new AsterixBuildProperties(propertiesAccessor);
INSTANCE.messagingProperties = new MessagingProperties(propertiesAccessor);
+ INSTANCE.metadataBootstrapSupplier = metadataBootstrapSupplier;
+
Logger.getLogger("org.apache.asterix").setLevel(INSTANCE.externalProperties.getLogLevel());
Logger.getLogger("org.apache.hyracks").setLevel(INSTANCE.externalProperties.getLogLevel());
}
@@ -204,4 +203,8 @@
public IAsterixResourceIdManager getResourceIdManager() {
return resourceIdManager;
}
+
+ public IMetadataBootstrap getMetadataBootstrap() {
+ return metadataBootstrapSupplier.get();
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
index bc15788..942abe3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
@@ -51,6 +51,8 @@
import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.json.JSONException;
import org.json.JSONObject;
@@ -109,7 +111,7 @@
}
}
- public synchronized void removeNCConfiguration(String nodeId) {
+ public synchronized void removeNCConfiguration(String nodeId) throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Removing configuration parameters for node id " + nodeId);
}
@@ -139,7 +141,8 @@
}
}
- public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
+ public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration)
+ throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering configuration parameters for node id " + nodeId);
}
@@ -167,7 +170,7 @@
updateNodePartitions(nodeId, true);
}
- private synchronized void updateNodePartitions(String nodeId, boolean added) {
+ private synchronized void updateNodePartitions(String nodeId, boolean added) throws HyracksDataException {
ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
// if this isn't a storage node, it will not have cluster partitions
if (nodePartitions != null) {
@@ -183,7 +186,7 @@
}
}
- private synchronized void updateClusterState() {
+ private synchronized void updateClusterState() throws HyracksDataException {
for (ClusterPartition p : clusterPartitions.values()) {
if (!p.isActive()) {
state = ClusterState.UNUSABLE;
@@ -191,11 +194,14 @@
return;
}
}
- //if all storage partitions are active as well as the metadata node, then the cluster is active
+ // if all storage partitions are active as well as the metadata node, then the cluster is active
if (metadataNodeActive) {
+ state = ClusterState.PENDING;
+ LOGGER.info("Cluster is now " + state);
+ AsterixAppContextInfo.INSTANCE.getMetadataBootstrap().init();
state = ClusterState.ACTIVE;
- LOGGER.info("Cluster is now ACTIVE");
- //start global recovery
+ LOGGER.info("Cluster is now " + state);
+ // start global recovery
AsterixAppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
processPendingFailbackPlans();
@@ -412,19 +418,21 @@
}
}
- public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
- for (Integer partitonId : reponse.getPartitions()) {
+ public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
+ throws HyracksDataException {
+ for (Integer partitonId : response.getPartitions()) {
ClusterPartition partition = clusterPartitions.get(partitonId);
partition.setActive(true);
- partition.setActiveNodeId(reponse.getNodeId());
+ partition.setActiveNodeId(response.getNodeId());
}
- pendingTakeoverRequests.remove(reponse.getRequestId());
+ pendingTakeoverRequests.remove(response.getRequestId());
resetClusterPartitionConstraint();
updateClusterState();
}
- public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) {
- currentMetadataNode = reponse.getNodeId();
+ public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage response)
+ throws HyracksDataException {
+ currentMetadataNode = response.getNodeId();
metadataNodeActive = true;
LOGGER.info("Current metadata node: " + currentMetadataNode);
updateClusterState();
@@ -556,7 +564,8 @@
}
}
- public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) {
+ public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
+ throws HyracksDataException {
/**
* the failback plan completed successfully:
* Remove all references to it.
@@ -564,7 +573,7 @@
* Notify its replicas to reconnect to it.
* Set the failing back node partitions as active.
*/
- NodeFailbackPlan plan = planId2FailbackPlanMap.remove(reponse.getPlanId());
+ NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId());
String nodeId = plan.getNodeId();
failedNodes.remove(nodeId);
//notify impacted replicas they can reconnect to this node
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
index e3a561d..77effb6 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
@@ -155,7 +155,7 @@
}
}
- @Parameters
+ @Parameters(name = "NCServiceExecutionTest {index}: {0}")
public static Collection<Object[]> tests() throws Exception {
Collection<Object[]> testArgs = new ArrayList<Object[]>();
TestCaseContext.Builder b = new TestCaseContext.Builder();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
index 733382b..a9bef18 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
@@ -21,8 +21,11 @@
import java.util.Map;
import java.util.Set;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
/**
- * A listener interface for providing notification call backs to events such as a Node Controller joining/leaving the cluster.
+ * A listener interface for providing notification call backs to events such as a Node Controller joining/leaving the
+ * cluster.
*/
public interface IClusterLifecycleListener {
@@ -35,15 +38,15 @@
/**
* @param nodeId
* A unique identifier of a Node Controller
- * @param ncConfig
+ * @param ncConfiguration
* A map containing the set of configuration parameters that were used to start the Node Controller
*/
- public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration);
+ public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException;
/**
* @param deadNodeIds
* A set of Node Controller Ids that have left the cluster. The set is not cumulative.
*/
- public void notifyNodeFailure(Set<String> deadNodeIds);
+ public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
index 786a89f..dff3107 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.control.common.controllers.CCConfig;
public class CCDriver {
- public static void main(String args[]) throws Exception {
+ public static void main(String args []) throws Exception {
try {
CCConfig ccConfig = new CCConfig();
CmdLineParser cp = new CmdLineParser(ccConfig);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index dfce7b8..dd6f83b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -110,7 +110,7 @@
}
}
- public void notifyNodeFailure(Set<String> deadNodeIds) {
+ public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException {
for (IClusterLifecycleListener l : clusterLifecycleListeners) {
l.notifyNodeFailure(deadNodeIds);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index b3a3065..510c729 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -24,6 +24,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
@@ -41,7 +42,7 @@
@Override
public void run() {
- Set<String> deadNodes = new HashSet<String>();
+ final Set<String> deadNodes = new HashSet<String>();
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
NodeControllerState state = e.getValue();
@@ -69,8 +70,12 @@
}
}
}
- if (deadNodes != null && deadNodes.size() > 0) {
- ccs.getApplicationContext().notifyNodeFailure(deadNodes);
+ if (!deadNodes.isEmpty()) {
+ try {
+ ccs.getApplicationContext().notifyNodeFailure(deadNodes);
+ } catch (HyracksException e) {
+ LOGGER.log(Level.WARNING, "Uncaught exception on notifyNodeFailure", e);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 64bd7d1..98d6375 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -34,7 +34,9 @@
@Option(name = "-address", usage = "IP Address for CC (default: localhost)", required = false)
public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress();
- @Option(name = "-client-net-ip-address", usage = "Sets the IP Address to listen for connections from clients (default: same as -address)", required = false)
+ @Option(name = "-client-net-ip-address",
+ usage = "Sets the IP Address to listen for connections from clients (default: same as -address)",
+ required = false)
public String clientNetIpAddress;
@Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)")
@@ -43,46 +45,60 @@
// QQQ Note that clusterNetIpAddress is *not directly used* yet. Both
// the cluster listener and the web server listen on "all interfaces".
// This IP address is only used to instruct the NC on which IP to call in.
- @Option(name = "-cluster-net-ip-address", usage = "Sets the IP Address to listen for connections from NCs (default: same as -address)", required = false)
+ @Option(name = "-cluster-net-ip-address",
+ usage = "Sets the IP Address to listen for connections from NCs (default: same as -address)",
+ required = false)
public String clusterNetIpAddress;
- @Option(name = "-cluster-net-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
+ @Option(name = "-cluster-net-port",
+ usage = "Sets the port to listen for connections from node controllers (default 1099)")
public int clusterNetPort = 1099;
@Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 16001)")
public int httpPort = 16001;
- @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
+ @Option(name = "-heartbeat-period",
+ usage = "Sets the time duration between two heartbeats from each node controller in milliseconds" +
+ " (default: 10000)")
public int heartbeatPeriod = 10000;
- @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
+ @Option(name = "-max-heartbeat-lapse-periods",
+ usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
public int maxHeartbeatLapsePeriods = 5;
- @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)")
+ @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node " +
+ "controller in milliseconds. 0 to disable. (default: 0)")
public int profileDumpPeriod = 0;
- @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not specified in the job specification. (default: 5)")
+ @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not " +
+ "specified in the job specification. (default: 5)")
public int defaultMaxJobAttempts = 5;
- @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to the specified value. (default: 10)")
+ @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to " +
+ "the specified value. (default: 10)")
public int jobHistorySize = 10;
- @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should be retained by the system in milliseconds. (default: 24 hours)")
+ @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should " +
+ "be retained by the system in milliseconds. (default: 24 hours)")
public long resultTTL = 86400000;
- @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute)")
+ @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup " +
+ "should be invoked in milliseconds. (default: 1 minute)")
public long resultSweepThreshold = 60000;
- @Option(name = "-cc-root", usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
+ @Option(name = "-cc-root",
+ usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
public String ccRoot = "ClusterControllerService";
- @Option(name = "-cluster-topology", required = false, usage = "Sets the XML file that defines the cluster topology. (default: null)")
+ @Option(name = "-cluster-topology", required = false,
+ usage = "Sets the XML file that defines the cluster topology. (default: null)")
public File clusterTopologyDefinition = null;
@Option(name = "-app-cc-main-class", required = false, usage = "Application CC Main Class")
public String appCCMainClass = null;
- @Option(name = "-config-file", usage = "Specify path to master configuration file (default: none)", required = false)
+ @Option(name = "-config-file",
+ usage = "Specify path to master configuration file (default: none)", required = false)
public String configFile = null;
@Argument
@@ -132,8 +148,8 @@
}
// "address" is the default for all IP addresses
- if (clusterNetIpAddress == null) clusterNetIpAddress = ipAddress;
- if (clientNetIpAddress == null) clientNetIpAddress = ipAddress;
+ clusterNetIpAddress = clusterNetIpAddress == null ? ipAddress : clusterNetIpAddress;
+ clientNetIpAddress = clientNetIpAddress == null ? ipAddress : clientNetIpAddress;
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
index c6c3e73..e999de4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
@@ -35,7 +35,11 @@
* the section "nc/red", but if it is not found, will look in the section "nc".
*/
public class IniUtils {
- private static <T> T getIniValue(Ini ini, String section, String key, T default_value, Class<T> clazz) {
+
+ private IniUtils() {
+ }
+
+ private static <T> T getIniValue(Ini ini, String section, String key, T defaultValue, Class<T> clazz) {
T value;
while (true) {
value = ini.get(section, key, clazz);
@@ -48,7 +52,7 @@
}
break;
}
- return (value != null) ? value : default_value;
+ return (value != null) ? value : defaultValue;
}
@SuppressWarnings("unchecked")
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index ce5043a..2e47f41 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -288,11 +288,9 @@
configuration.put("messaging-port", String.valueOf(messagingPort));
configuration.put("messaging-public-ip-address", messagingPublicIPAddress);
configuration.put("messaging-public-port", String.valueOf(messagingPublicPort));
+ configuration.put("ncservice-pid", String.valueOf(ncservicePid));
if (appNCMainClass != null) {
configuration.put("app-nc-main-class", appNCMainClass);
}
- if (ncservicePid != 0) {
- configuration.put("ncservice-pid", String.valueOf(ncservicePid));
- }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/SynchronizableWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/SynchronizableWork.java
index 02a789a..f9952db 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/SynchronizableWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/SynchronizableWork.java
@@ -18,6 +18,9 @@
*/
package org.apache.hyracks.control.common.work;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
public abstract class SynchronizableWork extends AbstractWork {
private boolean done;
@@ -34,8 +37,9 @@
public final void run() {
try {
doRun();
- } catch (Exception e) {
- this.e = e;
+ } catch (Exception ex) {
+ Logger.getLogger(getClass().getName()).log(Level.INFO, "Exception thrown from work", ex);
+ this.e = ex;
} finally {
synchronized (this) {
done = true;
@@ -46,11 +50,7 @@
public final synchronized void sync() throws Exception {
while (!done) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw e;
- }
+ wait();
}
if (e != null) {
throw e;
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java b/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java
index 13607ad..8d1246b 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java
@@ -41,7 +41,7 @@
public class NCServiceIT {
private static final String TARGET_DIR = StringUtils
- .join(new String[] { System.getProperty("basedir"), "target" }, File.separator);
+ .join(new String[] { ".", "target" }, File.separator);
private static final String LOG_DIR = StringUtils
.join(new String[] { TARGET_DIR, "failsafe-reports" }, File.separator);
private static final String RESOURCE_DIR = StringUtils