[ASTERIXDB-2110] Introduce Cluster Controller Id
Change-Id: Iec1b01444bfbd923e38f5c162c5244e17c4d5f03
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2323
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index bfa648a..aa9ac98 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -31,8 +31,8 @@
import java.util.concurrent.TimeoutException;
import org.apache.asterix.active.message.ActiveManagerMessage;
-import org.apache.asterix.active.message.ActiveStatsResponse;
import org.apache.asterix.active.message.ActiveStatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsResponse;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -116,7 +116,7 @@
LOGGER.warn("Request stats of a runtime that is not registered " + runtimeId);
// Send a failure message
((NodeControllerService) serviceCtx.getControllerService())
- .sendApplicationMessageToCC(
+ .sendApplicationMessageToCC(message.getCcId(),
JavaSerializationUtils
.serialize(new ActiveStatsResponse(reqId, null,
new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME,
@@ -126,7 +126,7 @@
String stats = runtime.getStats();
ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null);
((NodeControllerService) serviceCtx.getControllerService())
- .sendApplicationMessageToCC(JavaSerializationUtils.serialize(response), null);
+ .sendApplicationMessageToCC(message.getCcId(), JavaSerializationUtils.serialize(response), null);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index bef418b..b8c44a6 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -22,10 +22,11 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class ActiveManagerMessage implements INcAddressedMessage {
+public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddressedMessage {
public enum Kind {
STOP_ACTIVITY,
REQUEST_STATS
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index 87beae1..c3e02af 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -62,7 +62,8 @@
try {
CancelQueryRequest cancelQueryMessage =
new CancelQueryRequest(serviceCtx.getNodeId(), cancelQueryFuture.getFutureId(), clientContextId);
- messageBroker.sendMessageToCC(cancelQueryMessage);
+ // TODO(mblow): multicc -- need to send cancellation to the correct cc
+ messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
cancelQueryFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
response.setStatus(HttpResponseStatus.OK);
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 76f489c..5cbac64 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -88,7 +88,7 @@
responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(),
resultProperties.getNcToCcResultProperties(), param.clientContextID, handleUrl, optionalParameters);
execution.start();
- ncMb.sendMessageToCC(requestMsg);
+ ncMb.sendMessageToPrimaryCC(requestMsg);
try {
responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -137,7 +137,8 @@
try {
CancelQueryRequest cancelQueryMessage =
new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID);
- messageBroker.sendMessageToCC(cancelQueryMessage);
+ // TODO(mblow): multicc -- need to send cancellation to the correct cc
+ messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
if (wait) {
cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
TimeUnit.MILLISECONDS);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
index 49a84e1..e41bc60 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
@@ -20,6 +20,7 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -33,7 +34,7 @@
}
@Override
- public void perform(IControllerService cs) throws HyracksDataException {
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
try {
if (exportStub) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
index 02c377a..6f1775e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CheckpointTask.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -29,7 +30,7 @@
private static final long serialVersionUID = 1L;
@Override
- public void perform(IControllerService cs) throws HyracksDataException {
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
ICheckpointManager checkpointMgr = appContext.getTransactionSubsystem().getCheckpointManager();
checkpointMgr.doSharpCheckpoint();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
index 4e330c6..8cfeb12 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -21,6 +21,7 @@
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -34,7 +35,7 @@
}
@Override
- public void perform(IControllerService cs) throws HyracksDataException {
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
try {
ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
index eb19ad6..d0a8dcc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -38,7 +39,7 @@
}
@Override
- public void perform(IControllerService cs) throws HyracksDataException {
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
try {
appContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index 784f3b0..001af23 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -21,6 +21,7 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -29,7 +30,7 @@
private static final long serialVersionUID = 1L;
@Override
- public void perform(IControllerService cs) throws HyracksDataException {
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
try {
SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
index 86f7d1c..53b13e8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java
@@ -20,6 +20,7 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.runtime.message.ReportLocalCountersMessage;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -29,8 +30,8 @@
private static final long serialVersionUID = 1L;
@Override
- public void perform(IControllerService cs) throws HyracksDataException {
- ReportLocalCountersMessage.send((NodeControllerService) cs);
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
+ ReportLocalCountersMessage.send(ccId, (NodeControllerService) cs);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
index 7db473e..7ecc669 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartLifecycleComponentsTask.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.hyracks.bootstrap.AsterixStateDumpHandler;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
@@ -39,7 +40,7 @@
private static final long serialVersionUID = 1L;
@Override
- public void perform(IControllerService cs) throws HyracksDataException {
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext applicationContext = (INcApplicationContext) cs.getApplicationContext();
NCServiceContext serviceCtx = (NCServiceContext) cs.getContext();
MetadataProperties metadataProperties = applicationContext.getMetadataProperties();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
index 7071271..0cfb6b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
@@ -20,6 +20,7 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -28,7 +29,7 @@
private static final long serialVersionUID = 1L;
@Override
- public void perform(IControllerService cs) throws HyracksDataException {
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
try {
// open replication channel
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
index b0e1e06..a8c98c7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
@@ -19,6 +19,7 @@
package org.apache.asterix.app.replication.message;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -27,7 +28,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class MetadataNodeRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
+public class MetadataNodeRequestMessage extends CcIdentifiedMessage
+ implements INCLifecycleMessage, INcAddressedMessage {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager.getLogger();
@@ -55,7 +57,7 @@
MetadataNodeResponseMessage reponse =
new MetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId(), export);
try {
- broker.sendMessageToCC(reponse);
+ broker.sendMessageToCC(getCcId(), reponse);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Failed taking over metadata", e);
hde = HyracksDataException.suppress(hde, e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index 6ca576a..62e7a69 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.logging.log4j.Level;
@@ -44,12 +45,12 @@
this.nodeStatus = nodeStatus;
}
- public static void send(NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState)
+ public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState)
throws HyracksDataException {
try {
RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
systemState);
- ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg);
+ ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index d4c2340..a6f10ca 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -22,6 +22,7 @@
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -33,7 +34,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class RegistrationTasksResponseMessage implements INCLifecycleMessage, INcAddressedMessage {
+public class RegistrationTasksResponseMessage extends CcIdentifiedMessage
+ implements INCLifecycleMessage, INcAddressedMessage {
private static final Logger LOGGER = LogManager.getLogger();
private static final long serialVersionUID = 1L;
@@ -57,7 +59,7 @@
if (LOGGER.isInfoEnabled()) {
LOGGER.log(Level.INFO, "Starting startup task: " + task);
}
- task.perform(cs);
+ task.perform(getCcId(), cs);
if (LOGGER.isInfoEnabled()) {
LOGGER.log(Level.INFO, "Completed startup task: " + task);
}
@@ -70,7 +72,7 @@
NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success);
result.setException(exception);
try {
- broker.sendMessageToCC(result);
+ broker.sendMessageToCC(getCcId(), result);
} catch (Exception e) {
success = false;
LOGGER.log(Level.ERROR, "Failed sending message to cc", e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 4e9cb47..335899c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -2475,7 +2476,7 @@
}
}, clientContextId, ctx);
} catch (Exception e) {
- if (JobId.INVALID.equals(jobId.getValue())) {
+ if (Objects.equals(JobId.INVALID, jobId.getValue())) {
// compilation failed
ResultUtil.printStatus(sessionOutput, AbstractQueryApiServlet.ResultStatus.FAILED);
ResultUtil.printError(sessionOutput.out(), e);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 11f4e1c..1c7bfb7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -57,7 +57,6 @@
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.app.replication.NcLifecycleCoordinator;
import org.apache.asterix.common.api.AsterixThreadFactory;
-import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.api.INodeJobTracker;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.ExternalProperties;
@@ -348,14 +347,6 @@
}
@Override
- public void startupCompleted() throws Exception {
- ccServiceCtx.getControllerService().getExecutor().submit(() -> {
- appCtx.getClusterStateManager().waitForState(IClusterManagementWork.ClusterState.ACTIVE);
- return null;
- });
- }
-
- @Override
public IJobCapacityController getJobCapacityController() {
return jobCapacityController;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 8c87a26..932f47c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -60,9 +60,7 @@
@Override
public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("NC: " + nodeId + " joined");
- }
+ LOGGER.info("NC: {} joined", nodeId);
IClusterStateManager csm = appCtx.getClusterStateManager();
csm.notifyNodeJoin(nodeId, ncConfiguration);
@@ -79,9 +77,7 @@
@Override
public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
for (String deadNode : deadNodeIds) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("NC: " + deadNode + " left");
- }
+ LOGGER.info("NC: {} left", deadNode);
IClusterStateManager csm = appCtx.getClusterStateManager();
csm.notifyNodeFailure(deadNode);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0526a32..0d3b7b5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -50,6 +50,7 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -198,19 +199,23 @@
state = SystemState.BOOTSTRAPPING;
}
// Request registration tasks from CC
- RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
- NodeStatus.BOOTING, state);
+ // TODO (mblow): multicc
+ final NodeControllerService ncControllerService = (NodeControllerService) ncServiceCtx.getControllerService();
+ RegistrationTasksRequestMessage.send(ncControllerService.getPrimaryClusterController().getCcId(),
+ ncControllerService, NodeStatus.BOOTING, state);
startupCompleted = true;
}
@Override
- public void onRegisterNode() throws Exception {
- if (startupCompleted) {
+ public void onRegisterNode(CcId ccId) throws Exception {
+ // TODO (mblow): multicc
+ if (startupCompleted && ccId.equals(((NodeControllerService) ncServiceCtx.getControllerService())
+ .getPrimaryClusterController().getCcId())) {
/*
* If the node completed its startup before, then this is a re-registration with
* the CC and therefore the system state should be HEALTHY and the node status is ACTIVE
*/
- RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(),
+ RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
NodeStatus.ACTIVE, SystemState.HEALTHY);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 8e8fb93..80e0b33 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.ICcIdentifiedMessage;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.messaging.api.INcResponse;
import org.apache.commons.lang3.mutable.MutableInt;
@@ -39,7 +40,6 @@
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -70,6 +70,9 @@
public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
INodeManager nodeManager = ccs.getNodeManager();
NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
+ if (msg instanceof ICcIdentifiedMessage) {
+ ((ICcIdentifiedMessage) msg).setCcId(ccs.getCcId());
+ }
if (state != null) {
state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
} else {
@@ -97,6 +100,9 @@
for (int i = 0; i < ncs.size(); i++) {
String nc = ncs.get(i);
INcAddressedMessage message = requests.get(i);
+ if (!(message instanceof ICcIdentifiedMessage)) {
+ throw new IllegalStateException("sync request message not cc identified: " + message);
+ }
sendApplicationMessageToNC(message, nc);
}
long time = System.currentTimeMillis();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 08e406e..988c7bb 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -68,8 +69,13 @@
}
@Override
- public void sendMessageToCC(ICcAddressedMessage message) throws Exception {
- ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
+ public void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception {
+ ncs.sendApplicationMessageToCC(ccId, JavaSerializationUtils.serialize(message), null);
+ }
+
+ @Override
+ public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception {
+ sendMessageToCC(ncs.getPrimaryClusterController().getCcId(), message);
}
@Override
@@ -145,7 +151,7 @@
*/
@Override
public void run() {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
INcAddressedMessage msg = null;
try {
msg = receivedMsgsQ.take();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 093d150..9612ead 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -54,6 +54,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobStatus;
@@ -98,7 +99,7 @@
@Before
public void setUp() throws Exception {
- jobIdFactory = new JobIdFactory();
+ jobIdFactory = new JobIdFactory(CcId.valueOf((short) 0));
handler = new ActiveNotificationHandler();
allDatasets = new ArrayList<>();
firstDataset = new Dataset(dataverseName, "firstDataset", null, null, null, null, null, null, null, null, 0, 0);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
index c30e999..138f0e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INCLifecycleTask.java
@@ -20,6 +20,7 @@
import java.io.Serializable;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
@@ -29,8 +30,9 @@
/**
* Performs the task.
*
+ * @param ccId
* @param cs
* @throws HyracksDataException
*/
- void perform(IControllerService cs) throws HyracksDataException;
+ void perform(CcId ccId, IControllerService cs) throws HyracksDataException;
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java
new file mode 100644
index 0000000..d8a68ef
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/CcIdentifiedMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.io.Serializable;
+
+import org.apache.asterix.common.messaging.api.ICcIdentifiedMessage;
+import org.apache.hyracks.api.control.CcId;
+
+public abstract class CcIdentifiedMessage implements ICcIdentifiedMessage, Serializable {
+ private CcId ccId;
+
+ @Override
+ public CcId getCcId() {
+ return ccId;
+ }
+
+ @Override
+ public void setCcId(CcId ccId) {
+ this.ccId = ccId;
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 33a8ff3..208686c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.api.messages.IMessageBroker;
public interface ICCMessageBroker extends IMessageBroker {
- public enum ResponseState {
+ enum ResponseState {
UNINITIALIZED,
SUCCESS,
FAILURE
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java
index 37cba9c..549f1dd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcAddressedMessage.java
@@ -29,5 +29,4 @@
* handle the message upon delivery
*/
void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException;
-
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java
new file mode 100644
index 0000000..d144498
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICcIdentifiedMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.control.CcId;
+
+public interface ICcIdentifiedMessage {
+ CcId getCcId();
+
+ void setCcId(CcId ccId);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 86d1074..6ec2d7e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -18,17 +18,26 @@
*/
package org.apache.asterix.common.messaging.api;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.messages.IMessageBroker;
public interface INCMessageBroker extends IMessageBroker {
/**
+ * Sends application message from this NC to the primary CC.
+ *
+ * @param message
+ * @throws Exception
+ */
+ public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception;
+
+ /**
* Sends application message from this NC to the CC.
*
* @param message
* @throws Exception
*/
- public void sendMessageToCC(ICcAddressedMessage message) throws Exception;
+ public void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception;
/**
* Sends application message from this NC to another NC.
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index 1988f0a..b35d02a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -24,9 +24,9 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.metadata.IMetadataLock;
-import org.apache.asterix.common.utils.InvokeUtil;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.util.InvokeUtil;
public class DatasetLock implements IMetadataLock {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
index 0b321a2..4b95253 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.logging.log4j.Level;
@@ -51,8 +52,7 @@
resourceIdManager.report(src, maxResourceId);
}
- public static void send(NodeControllerService cs) throws HyracksDataException {
- NodeControllerService ncs = cs;
+ public static void send(CcId ccId, NodeControllerService ncs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext();
long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
@@ -60,7 +60,7 @@
ReportLocalCountersMessage countersMessage =
new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId);
try {
- ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(countersMessage);
+ ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(ccId, countersMessage);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to report local counters", e);
throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
index 785ad2f..51f53e7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java
@@ -19,16 +19,18 @@
package org.apache.asterix.runtime.message;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.nc.NodeControllerService;
-public class ReportLocalCountersRequestMessage implements INcAddressedMessage {
+public class ReportLocalCountersRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage {
private static final long serialVersionUID = 1L;
@Override
public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
- ReportLocalCountersMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService());
+ ReportLocalCountersMessage.send(getCcId(),
+ (NodeControllerService) appCtx.getServiceContext().getControllerService());
}
@Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index b7a4c14..78b1f17 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -62,7 +62,7 @@
//if no response available or it has an exception, request a new one
if (reponse == null || reponse.getException() != null) {
ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId);
- ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToCC(msg);
+ ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
reponse = resourceIdResponseQ.take();
if (reponse.getException() != null) {
throw HyracksDataException.create(reponse.getException());
diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml
index 22ef244..80fbd6c 100644
--- a/asterixdb/asterix-transactions/pom.xml
+++ b/asterixdb/asterix-transactions/pom.xml
@@ -155,5 +155,9 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 00c6c13..840a19b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -54,8 +54,8 @@
import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.common.transactions.MutableLong;
import org.apache.asterix.common.transactions.TxnLogFile;
-import org.apache.asterix.common.utils.InvokeUtil;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.util.InvokeUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 53cd038..0bb8293 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -28,7 +28,7 @@
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.utils.InvokeUtil;
+import org.apache.hyracks.util.InvokeUtil;
public class LogManagerWithReplication extends LogManager {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
index 64f4e29..af6cb92 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.api.application;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -33,5 +34,5 @@
*/
IFileDeviceResolver getFileDeviceResolver();
- void onRegisterNode() throws Exception;
+ void onRegisterNode(CcId ccId) throws Exception;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java
index 80ff77c..d42cbb3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java
@@ -67,6 +67,10 @@
return (int)get(option);
}
+ default short getShort(IOption option) {
+ return (short)get(option);
+ }
+
default String getString(IOption option) {
return (String)get(option);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
new file mode 100644
index 0000000..32782fd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/control/CcId.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.control;
+
+import java.io.Serializable;
+
+public class CcId implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private short id;
+
+ private CcId(short id) {
+ this.id = id;
+ }
+
+ public static CcId valueOf(String ccIdString) {
+ return new CcId(Integer.decode(ccIdString).shortValue());
+ }
+
+ public static CcId valueOf(int ccId) {
+ if ((ccId & ~0xffff) != 0) {
+ throw new IllegalArgumentException("ccId cannot exceed 16-bits: " + Integer.toHexString(ccId));
+ }
+ return new CcId((short) ccId);
+ }
+
+ public short shortValue() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof CcId && id == ((CcId) obj).id;
+ }
+
+ @Override
+ public String toString() {
+ return "CC:" + Integer.toHexString(((int) id) & 0xffff);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index b6d4f6b..35fdb2e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -126,7 +126,7 @@
public static final int ILLEGAL_MEMORY_BUDGET = 90;
public static final int TIMEOUT = 91;
public static final int JOB_HAS_BEEN_CLEARED_FROM_HISTORY = 92;
- public static final int JOB_HAS_NOT_BEEN_CREATED_YET = 93;
+ // 93
public static final int CANNOT_READ_CLOSED_FILE = 94;
public static final int TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME = 95;
public static final int ILLEGAL_ATTEMPT_TO_ENTER_EMPTY_COMPONENT = 96;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
index 47da24a..c83366f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java
@@ -23,16 +23,22 @@
import java.io.IOException;
import java.io.Serializable;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IWritable;
-public final class JobId implements IWritable, Serializable {
+public final class JobId implements IWritable, Serializable, Comparable {
- public static final JobId INVALID = new JobId(-1l);
+ private static final int CC_BITS = Short.SIZE;
+ static final int ID_BITS = Long.SIZE - CC_BITS;
+ static final long MAX_ID = (1L << ID_BITS) - 1;
+
+ public static final JobId INVALID = null;
private static final long serialVersionUID = 1L;
private long id;
+ private transient CcId ccId;
public static JobId create(DataInput dis) throws IOException {
JobId jobId = new JobId();
@@ -51,6 +57,17 @@
return id;
}
+ public CcId getCcId() {
+ if (ccId == null) {
+ ccId = CcId.valueOf((int) (id >>> ID_BITS));
+ }
+ return ccId;
+ }
+
+ public long getIdOnly() {
+ return id & MAX_ID;
+ }
+
@Override
public int hashCode() {
return (int) id;
@@ -58,13 +75,7 @@
@Override
public boolean equals(Object o) {
- if (o == this) {
- return true;
- }
- if (!(o instanceof JobId)) {
- return false;
- }
- return ((JobId) o).id == id;
+ return o == this || o instanceof JobId && ((JobId) o).id == id;
}
@Override
@@ -89,4 +100,9 @@
public void readFields(DataInput input) throws IOException {
id = input.readLong();
}
+
+ @Override
+ public int compareTo(Object other) {
+ return Long.compare(id, ((JobId) other).id);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
index eea6b52..1bb5749 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobIdFactory.java
@@ -18,20 +18,36 @@
*/
package org.apache.hyracks.api.job;
+import static org.apache.hyracks.api.job.JobId.ID_BITS;
+import static org.apache.hyracks.api.job.JobId.MAX_ID;
+
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hyracks.api.control.CcId;
+
public class JobIdFactory {
- private final AtomicLong id = new AtomicLong(0);
+ private final AtomicLong id;
+
+ public JobIdFactory(CcId ccId) {
+ id = new AtomicLong((long) ccId.shortValue() << ID_BITS);
+ }
public JobId create() {
- return new JobId(id.getAndIncrement());
+ return new JobId(id.getAndUpdate(prev -> {
+ if ((prev & MAX_ID) == MAX_ID) {
+ return prev ^ MAX_ID;
+ } else {
+ return prev + 1;
+ }
+ }));
}
- public long maxJobId() {
- return id.get();
- }
-
- public void ensureMinimumId(long id) {
- this.id.updateAndGet(current -> Math.max(current, id));
+ public JobId maxJobId() {
+ long next = id.get();
+ if ((next & MAX_ID) == 0) {
+ return new JobId(next | MAX_ID);
+ } else {
+ return new JobId(next - 1);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 6254b86..465a661 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -109,7 +109,7 @@
90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes)
91 = Operation timed out
92 = Job %1$s has been cleared from job history
-93 = Job %1$s has not been created yet
+# 93
94 = Cannot read closed file (%1$s)
95 = Tuple of size %1$s cannot fit into an empty frame
96 = Illegal attempt to enter empty component
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java
new file mode 100644
index 0000000..d16eb15
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/job/JobIdFactoryTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hyracks.api.control.CcId;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JobIdFactoryTest {
+
+ private static Field idField;
+
+ @BeforeClass
+ public static void setup() throws NoSuchFieldException {
+ idField = JobIdFactory.class.getDeclaredField("id");
+ idField.setAccessible(true);
+ }
+
+ @Test
+ public void testCcIds() {
+ JobIdFactory factory = new JobIdFactory(CcId.valueOf(0));
+ for (int i = 0; i < 1000; i++) {
+ final JobId jobId = factory.create();
+ Assert.assertEquals(0, jobId.getCcId().shortValue());
+ Assert.assertEquals(i, jobId.getIdOnly());
+ }
+ }
+
+ @Test
+ public void testNegativeCcId() {
+ JobIdFactory factory = new JobIdFactory(CcId.valueOf(0xFFFF));
+ for (int i = 0; i < 1000; i++) {
+ final JobId jobId = factory.create();
+ Assert.assertEquals((short) 0xFFFF, jobId.getCcId().shortValue());
+ Assert.assertEquals(i, jobId.getIdOnly());
+ Assert.assertTrue("JID not negative", jobId.getId() < 0);
+ Assert.assertEquals(0xFFFF000000000000L + i, jobId.getId());
+ }
+ }
+
+ @Test
+ public void testOverflow() throws IllegalAccessException {
+ testOverflow(0);
+ testOverflow(0xFFFF);
+ testOverflow(Short.MAX_VALUE);
+ }
+
+ private void testOverflow(int id) throws IllegalAccessException {
+ CcId ccId = CcId.valueOf(id);
+ long expected = (long) id << 48;
+ JobIdFactory factory = new JobIdFactory(ccId);
+ AtomicLong theId = (AtomicLong) idField.get(factory);
+ Assert.assertEquals(expected, theId.get());
+ theId.set((((long)1 << 48) - 1) | expected);
+ JobId jobId = factory.create();
+ Assert.assertEquals(ccId, jobId.getCcId());
+ Assert.assertEquals(JobId.MAX_ID, jobId.getIdOnly());
+ jobId = factory.create();
+ Assert.assertEquals(ccId, jobId.getCcId());
+ Assert.assertEquals(0, jobId.getIdOnly());
+ }
+
+ @Test
+ public void testComparability() throws IllegalAccessException {
+ JobIdFactory factory = new JobIdFactory(CcId.valueOf(0));
+ compareLoop(factory, false);
+ factory = new JobIdFactory(CcId.valueOf(0xFFFF));
+ compareLoop(factory, false);
+ AtomicLong theId = (AtomicLong) idField.get(factory);
+ theId.set(0xFFFFFFFFFFFFFFF0L);
+ compareLoop(factory, true);
+ }
+
+ private void compareLoop(JobIdFactory factory, boolean overflow) {
+ Set<Boolean> overflowed = new HashSet<>(Collections.singleton(false));
+ JobId prevMax = null;
+ for (int i = 0; i < 1000; i++) {
+ final JobId jobId = factory.create();
+ Assert.assertTrue("max == last", factory.maxJobId().compareTo(jobId) == 0);
+ if (i > 0) {
+ Assert.assertTrue("last > previous max", prevMax.compareTo(jobId) < 0 || overflowed.add(overflow));
+ }
+ prevMax = factory.maxJobId();
+ }
+ }
+
+ @Test
+ public void testTooLarge() {
+ try {
+ CcId.valueOf(0x10000);
+ Assert.assertTrue("expected exception", false);
+ } catch (IllegalArgumentException e) {
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 360975d..1ec7485 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -25,7 +25,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -37,14 +36,12 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.application.ICCApplication;
-import org.apache.hyracks.api.application.IClusterLifecycleListener;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.config.IApplicationConfig;
-import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.context.ICCContext;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -147,6 +144,8 @@
private ShutdownRun shutdownCallback;
+ private final CcId ccId;
+
static {
ExitUtil.init();
}
@@ -182,7 +181,8 @@
// Node manager is in charge of cluster membership management.
nodeManager = new NodeManager(this, ccConfig, resourceManager);
- jobIdFactory = new JobIdFactory();
+ ccId = ccConfig.getCcId();
+ jobIdFactory = new JobIdFactory(ccId);
deployedJobSpecIdFactory = new DeployedJobSpecIdFactory();
}
@@ -252,17 +252,18 @@
}
}
- private Pair<String, Integer> getNCService(String nodeId) {
+ private InetSocketAddress getNCService(String nodeId) {
IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(nodeId);
- return Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
- ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT));
+ final int port = ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT);
+ return port == NCConfig.NCSERVICE_PORT_DISABLED ? null
+ : InetSocketAddress.createUnresolved(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS), port);
}
- private Map<String, Pair<String, Integer>> getNCServices() {
- Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
+ private Map<String, InetSocketAddress> getNCServices() {
+ Map<String, InetSocketAddress> ncMap = new TreeMap<>();
for (String ncId : configManager.getNodeNames()) {
- Pair<String, Integer> ncService = getNCService(ncId);
- if (ncService.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
+ InetSocketAddress ncService = getNCService(ncId);
+ if (ncService != null) {
ncMap.put(ncId, ncService);
}
}
@@ -271,31 +272,19 @@
private void connectNCs() {
getNCServices().forEach((key, value) -> {
- final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, value.getLeft(),
- value.getRight(), key);
+ final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this, value.getHostString(),
+ value.getPort(), key);
executor.submit(triggerWork);
});
- serviceCtx.addClusterLifecycleListener(new IClusterLifecycleListener() {
- @Override
- public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
- // no-op, we don't care
- LOGGER.log(Level.WARN, "Getting notified that node: " + nodeId + " has joined. and we don't care");
- }
-
- @Override
- public void notifyNodeFailure(Collection<String> deadNodeIds) throws HyracksException {
- LOGGER.log(Level.WARN, "Getting notified that nodes: " + deadNodeIds + " has failed");
- }
- });
}
public boolean startNC(String nodeId) {
- Pair<String, Integer> ncServiceAddress = getNCService(nodeId);
+ InetSocketAddress ncServiceAddress = getNCService(nodeId);
if (ncServiceAddress == null) {
return false;
}
- final TriggerNCWork startNc = new TriggerNCWork(ClusterControllerService.this, ncServiceAddress.getLeft(),
- ncServiceAddress.getRight(), nodeId);
+ final TriggerNCWork startNc = new TriggerNCWork(ClusterControllerService.this, ncServiceAddress.getHostString(),
+ ncServiceAddress.getPort(), nodeId);
executor.submit(startNc);
return true;
@@ -304,11 +293,9 @@
private void terminateNCServices() throws Exception {
List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>();
getNCServices().forEach((key, value) -> {
- if (value.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
- ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(value.getLeft(), value.getRight(), key);
- workQueue.schedule(shutdownWork);
- shutdownNCServiceWorks.add(shutdownWork);
- }
+ ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(value.getHostString(), value.getPort(), key);
+ workQueue.schedule(shutdownWork);
+ shutdownNCServiceWorks.add(shutdownWork);
});
for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) {
shutdownWork.sync();
@@ -428,6 +415,10 @@
return deployedJobSpecIdFactory;
}
+ public CcId getCcId() {
+ return ccId;
+ }
+
private final class ClusterControllerContext implements ICCContext {
private final ClusterTopology topology;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 367a1d5..742e2e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -105,7 +105,7 @@
try {
// TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
- ncIPCHandle.send(-1, new AbortCCJobsFunction(), null);
+ ncIPCHandle.send(-1, new AbortCCJobsFunction(ccConfig.getCcId()), null);
} catch (IPCException e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index 3a38287..04a34af 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -57,7 +57,7 @@
try {
LOGGER.log(Level.WARN, "Registering INodeController: id = " + id);
NodeControllerRemoteProxy nc =
- new NodeControllerRemoteProxy(
+ new NodeControllerRemoteProxy(ccs.getCcId(),
ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress()));
NodeControllerState state = new NodeControllerState(nc, reg);
INodeManager nodeManager = ccs.getNodeManager();
@@ -73,7 +73,6 @@
params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriodMillis());
params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
result = new CCNCFunctions.NodeRegistrationResult(params, null);
- ccs.getJobIdFactory().ensureMinimumId(reg.getMaxJobId() + 1);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Node registration failed", e);
result = new CCNCFunctions.NodeRegistrationResult(null, e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index f1d9a4d..53998aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -63,17 +63,15 @@
List<Exception> exceptions;
if (exceptionHistory == null) {
// couldn't be found
- long maxJobId = ccs.getJobIdFactory().maxJobId();
- exceptions = Collections.singletonList(jobId.getId() <= maxJobId
- ? HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId)
- : HyracksDataException.create(ErrorCode.JOB_HAS_NOT_BEEN_CREATED_YET, jobId));
+ exceptions = Collections
+ .singletonList(HyracksDataException.create(ErrorCode.JOB_HAS_BEEN_CLEARED_FROM_HISTORY, jobId));
} else {
exceptions = exceptionHistory;
}
ccs.getExecutor().execute(() -> {
if (!exceptions.isEmpty()) {
- /**
+ /*
* only report the first exception because IResultCallback will only throw one exception
* anyway
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index 6fd321e..2307185 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
@@ -35,42 +36,44 @@
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
public interface IClusterController {
- public void registerNode(NodeRegistration reg) throws Exception;
+ void registerNode(NodeRegistration reg) throws Exception;
- public void unregisterNode(String nodeId) throws Exception;
+ void unregisterNode(String nodeId) throws Exception;
- public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+ void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception;
- public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
+ void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, List<Exception> exceptions)
throws Exception;
- public void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception;
+ void notifyDeployedJobSpecFailure(DeployedJobSpecId deployedJobSpecId, String nodeId) throws Exception;
- public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
+ void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
- public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
+ void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
- public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
+ void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
- public void notifyShutdown(String nodeId) throws Exception;
+ void notifyShutdown(String nodeId) throws Exception;
- public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
+ void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
- public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
+ void reportProfile(String id, List<JobProfile> profiles) throws Exception;
- public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
+ void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
- public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
+ void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
- public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+ void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
- public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
+ void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;
- public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
+ void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
- public void getNodeControllerInfos() throws Exception;
+ void getNodeControllerInfos() throws Exception;
- public void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
+ void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception;
+
+ CcId getCcId();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 5d781cf..ef3b27c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -36,30 +36,30 @@
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
public interface INodeController {
- public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
- List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
- Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
+ void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
+ Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId)
throws Exception;
- public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
+ void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
- public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
+ void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
- public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
+ void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
- public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
+ void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
- public void undeployBinary(DeploymentId deploymentId) throws Exception;
+ void undeployBinary(DeploymentId deploymentId) throws Exception;
- public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
+ void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception;
- public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
+ void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception;
- public void dumpState(String stateDumpId) throws Exception;
+ void dumpState(String stateDumpId) throws Exception;
- public void shutdown(boolean terminateNCService) throws Exception;
+ void shutdown(boolean terminateNCService) throws Exception;
- public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+ void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
- public void takeThreadDump(String requestId) throws Exception;
+ void takeThreadDump(String requestId) throws Exception;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index 62e6ee0..42ed1e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -81,6 +81,27 @@
}
};
+ public static final IOptionType<Short> SHORT = new IOptionType<Short>() {
+ @Override
+ public Short parse(String s) {
+ int value = Integer.decode(s);
+ if (Integer.highestOneBit(value) > 16) {
+ throw new IllegalArgumentException("The given value " + s + " is too big for a short");
+ }
+ return (short)value;
+ }
+
+ @Override
+ public Class<Short> targetType() {
+ return Short.class;
+ }
+
+ @Override
+ public void serializeJSONField(String fieldName, Object value, ObjectNode node) {
+ node.put(fieldName, (short)value);
+ }
+ };
+
public static final IOptionType<Integer> INTEGER = new IOptionType<Integer>() {
@Override
public Integer parse(String s) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 470e87c..85731b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -21,6 +21,7 @@
import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.SHORT;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
import java.io.File;
@@ -33,6 +34,7 @@
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.util.file.FileUtil;
import org.ini4j.Ini;
@@ -67,7 +69,8 @@
JOB_QUEUE_CAPACITY(INTEGER, 4096),
JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false),
- CORES_MULTIPLIER(INTEGER, 3);
+ CORES_MULTIPLIER(INTEGER, 3),
+ CONTROLLER_ID(SHORT, (short)0x0000);
private final IOptionType parser;
private Object defaultValue;
@@ -164,6 +167,8 @@
+ "bad behaving operators";
case CORES_MULTIPLIER:
return "Specifies the multiplier to use on the cluster available cores";
+ case CONTROLLER_ID:
+ return "The 16-bit (0-65535) id of this Cluster Controller";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -374,4 +379,8 @@
public int getCoresMultiplier() {
return getAppConfig().getInt(Option.CORES_MULTIPLIER);
}
+
+ public CcId getCcId() {
+ return CcId.valueOf(getAppConfig().getShort(Option.CONTROLLER_ID));
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 0a5ba30..95c063f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -21,6 +21,7 @@
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.SHORT;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING_ARRAY;
@@ -33,6 +34,7 @@
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.util.file.FileUtil;
@@ -48,6 +50,7 @@
NCSERVICE_PORT(INTEGER, 9090),
CLUSTER_ADDRESS(STRING, (String) null),
CLUSTER_PORT(INTEGER, 1099),
+ CLUSTER_CONTROLLER_ID(SHORT, (short)0x0000),
CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
NODE_ID(STRING, (String) null),
@@ -141,6 +144,8 @@
return "Cluster Controller port";
case CLUSTER_LISTEN_PORT:
return "IP port to bind cluster listener";
+ case CLUSTER_CONTROLLER_ID:
+ return "16-bit (0-65535) id of the Cluster Controller";
case CLUSTER_PUBLIC_ADDRESS:
return "Public IP Address to announce cluster listener";
case CLUSTER_PUBLIC_PORT:
@@ -308,6 +313,10 @@
configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort);
}
+ public CcId getClusterControllerId() {
+ return CcId.valueOf(appConfig.getShort(Option.CLUSTER_CONTROLLER_ID));
+ }
+
public String getClusterListenAddress() {
return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index dca8c07..5c6d078 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -38,6 +38,7 @@
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
@@ -150,12 +151,25 @@
}
- public static abstract class Function implements Serializable {
+ public abstract static class Function implements Serializable {
private static final long serialVersionUID = 1L;
public abstract FunctionId getFunctionId();
}
+ public abstract static class CCIdentifiedFunction extends Function {
+ private static final long serialVersionUID = 1L;
+ private final CcId ccId;
+
+ protected CCIdentifiedFunction(CcId ccId) {
+ this.ccId = ccId;
+ }
+
+ public CcId getCcId() {
+ return ccId;
+ }
+ }
+
public static class RegisterNodeFunction extends Function {
private static final long serialVersionUID = 1L;
@@ -668,24 +682,33 @@
}
}
- //TODO: Add CC id to this job to only abort jobs by this CC: https://issues.apache.org/jira/browse/ASTERIXDB-2110
public static class AbortCCJobsFunction extends Function {
private static final long serialVersionUID = 1L;
+ private final CcId ccId;
+
+ public AbortCCJobsFunction(CcId ccId) {
+ this.ccId = ccId;
+ }
@Override
public FunctionId getFunctionId() {
return FunctionId.ABORT_ALL_JOBS;
}
+
+ public CcId getCcId() {
+ return ccId;
+ }
}
- public static class DeployJobSpecFunction extends Function {
+ public static class DeployJobSpecFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final DeployedJobSpecId deployedJobSpecId;
private final byte[] acgBytes;
- public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes) {
+ public DeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, byte[] acgBytes, CcId ccId) {
+ super(ccId);
this.deployedJobSpecId = deployedJobSpecId;
this.acgBytes = acgBytes;
}
@@ -704,12 +727,13 @@
}
}
- public static class UndeployJobSpecFunction extends Function {
+ public static class UndeployJobSpecFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final DeployedJobSpecId deployedJobSpecId;
- public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId) {
+ public UndeployJobSpecFunction(DeployedJobSpecId deployedJobSpecId, CcId ccId) {
+ super(ccId);
this.deployedJobSpecId = deployedJobSpecId;
}
@@ -724,7 +748,7 @@
}
public static class StartTasksFunction extends Function {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final DeploymentId deploymentId;
private final JobId jobId;
@@ -1008,11 +1032,12 @@
}
}
- public static class ThreadDumpRequestFunction extends Function {
+ public static class ThreadDumpRequestFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final String requestId;
- public ThreadDumpRequestFunction(String requestId) {
+ public ThreadDumpRequestFunction(String requestId, CcId ccId) {
+ super(ccId);
this.requestId = requestId;
}
@@ -1106,13 +1131,14 @@
}
}
- public static class DeployBinaryFunction extends Function {
+ public static class DeployBinaryFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final List<URL> binaryURLs;
private final DeploymentId deploymentId;
- public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs) {
+ public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
+ super(ccId);
this.binaryURLs = binaryURLs;
this.deploymentId = deploymentId;
}
@@ -1131,12 +1157,13 @@
}
}
- public static class UnDeployBinaryFunction extends Function {
+ public static class UnDeployBinaryFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final DeploymentId deploymentId;
- public UnDeployBinaryFunction(DeploymentId deploymentId) {
+ public UnDeployBinaryFunction(DeploymentId deploymentId, CcId ccId) {
+ super(ccId);
this.deploymentId = deploymentId;
}
@@ -1211,12 +1238,13 @@
}
}
- public static class StateDumpRequestFunction extends Function {
+ public static class StateDumpRequestFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final String stateDumpId;
- public StateDumpRequestFunction(String stateDumpId) {
+ public StateDumpRequestFunction(String stateDumpId, CcId ccId) {
+ super(ccId);
this.stateDumpId = stateDumpId;
}
@@ -1265,12 +1293,13 @@
}
}
- public static class ShutdownRequestFunction extends Function {
+ public static class ShutdownRequestFunction extends CCIdentifiedFunction {
private static final long serialVersionUID = 1L;
private final boolean terminateNCService;
- public ShutdownRequestFunction(boolean terminateNCService) {
+ public ShutdownRequestFunction(boolean terminateNCService, CcId ccId) {
+ super(ccId);
this.terminateNCService = terminateNCService;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index f2e7d87..e4e2dbe 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.deployment.DeploymentId;
@@ -56,9 +57,11 @@
public class ClusterControllerRemoteProxy implements IClusterController {
+ private final CcId ccId;
private IIPCHandle ipcHandle;
- public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+ public ClusterControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) {
+ this.ccId = ccId;
this.ipcHandle = ipcHandle;
}
@@ -181,4 +184,14 @@
threadDumpJSON);
ipcHandle.send(-1, tdrf, null);
}
+
+ @Override
+ public CcId getCcId() {
+ return ccId;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " " + ccId + " [" + ipcHandle.getRemoteAddress() + "]";
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index b6b9b4b..a09a8bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -25,6 +25,7 @@
import java.util.Set;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -51,9 +52,11 @@
import org.apache.hyracks.ipc.api.IIPCHandle;
public class NodeControllerRemoteProxy implements INodeController {
+ private final CcId ccId;
private final IIPCHandle ipcHandle;
- public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+ public NodeControllerRemoteProxy(CcId ccId, IIPCHandle ipcHandle) {
+ this.ccId = ccId;
this.ipcHandle = ipcHandle;
}
@@ -88,37 +91,37 @@
@Override
public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
- DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs);
+ DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId);
ipcHandle.send(-1, rpaf, null);
}
@Override
public void undeployBinary(DeploymentId deploymentId) throws Exception {
- UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId);
+ UnDeployBinaryFunction rpaf = new UnDeployBinaryFunction(deploymentId, ccId);
ipcHandle.send(-1, rpaf, null);
}
@Override
public void deployJobSpec(DeployedJobSpecId deployedJobSpecId, byte[] planBytes) throws Exception {
- DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes);
+ DeployJobSpecFunction fn = new DeployJobSpecFunction(deployedJobSpecId, planBytes, ccId);
ipcHandle.send(-1, fn, null);
}
@Override
public void undeployJobSpec(DeployedJobSpecId deployedJobSpecId) throws Exception {
- UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId);
+ UndeployJobSpecFunction fn = new UndeployJobSpecFunction(deployedJobSpecId, ccId);
ipcHandle.send(-1, fn, null);
}
@Override
public void dumpState(String stateDumpId) throws Exception {
- StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId);
+ StateDumpRequestFunction dsf = new StateDumpRequestFunction(stateDumpId, ccId);
ipcHandle.send(-1, dsf, null);
}
@Override
public void shutdown(boolean terminateNCService) throws Exception {
- ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService);
+ ShutdownRequestFunction sdrf = new ShutdownRequestFunction(terminateNCService, ccId);
ipcHandle.send(-1, sdrf, null);
}
@@ -131,7 +134,7 @@
@Override
public void takeThreadDump(String requestId) throws Exception {
- ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId);
+ ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId, ccId);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 94e86dd..9670e42 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -58,7 +59,7 @@
}
@Override
- public void onRegisterNode() throws Exception {
+ public void onRegisterNode(CcId ccId) throws Exception {
// no-op
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 8cb33ca..8790434 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -222,15 +222,10 @@
public void close() {
long stillAllocated = memoryAllocation.get();
if (stillAllocated > 0) {
- LOGGER.warn("Freeing leaked " + stillAllocated + " bytes");
+ LOGGER.info(() -> "Freeing leaked " + stillAllocated + " bytes");
serviceCtx.getMemoryManager().deallocate(stillAllocated);
}
- nodeController.getExecutor().execute(new Runnable() {
- @Override
- public void run() {
- deallocatableRegistry.close();
- }
- });
+ nodeController.getExecutor().execute(() -> deallocatableRegistry.close());
}
ByteBuffer allocateFrame() throws HyracksDataException {
@@ -298,7 +293,7 @@
for (PartitionId pid : pids) {
partitionRequestMap.put(pid, collector);
PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState);
- nodeController.getClusterController().registerPartitionRequest(req);
+ nodeController.getClusterController(jobId.getCcId()).registerPartitionRequest(req);
}
}
@@ -326,7 +321,7 @@
close();
cleanupPending = false;
try {
- nodeController.getClusterController().notifyJobletCleanup(jobId, nodeController.getId());
+ nodeController.getClusterController(jobId.getCcId()).notifyJobletCleanup(jobId, nodeController.getId());
} catch (Exception e) {
e.printStackTrace();
}
@@ -341,4 +336,5 @@
public ClassLoader getClassLoader() throws HyracksException {
return DeploymentUtils.getClassLoader(deploymentId, serviceCtx);
}
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index b220039..f55e250 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -72,7 +72,8 @@
ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks()));
return;
case ABORT_ALL_JOBS:
- ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs));
+ CCNCFunctions.AbortCCJobsFunction aajf = (CCNCFunctions.AbortCCJobsFunction) fn;
+ ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs, aajf.getCcId()));
return;
case CLEANUP_JOBLET:
CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
@@ -97,27 +98,29 @@
case DEPLOY_BINARY:
CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
- ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs()));
+ ncs.getWorkQueue()
+ .schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(), dbf.getCcId()));
return;
case UNDEPLOY_BINARY:
CCNCFunctions.UnDeployBinaryFunction ndbf = (CCNCFunctions.UnDeployBinaryFunction) fn;
- ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId()));
+ ncs.getWorkQueue().schedule(new UnDeployBinaryWork(ncs, ndbf.getDeploymentId(), ndbf.getCcId()));
return;
case DISTRIBUTE_JOB:
CCNCFunctions.DeployJobSpecFunction djf = (CCNCFunctions.DeployJobSpecFunction) fn;
- ncs.getWorkQueue().schedule(new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes()));
+ ncs.getWorkQueue().schedule(
+ new DeployJobSpecWork(ncs, djf.getDeployedJobSpecId(), djf.getacgBytes(), djf.getCcId()));
return;
case DESTROY_JOB:
CCNCFunctions.UndeployJobSpecFunction dsjf = (CCNCFunctions.UndeployJobSpecFunction) fn;
- ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId()));
+ ncs.getWorkQueue().schedule(new UndeployJobSpecWork(ncs, dsjf.getDeployedJobSpecId(), dsjf.getCcId()));
return;
case STATE_DUMP_REQUEST:
final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
- ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId()));
+ ncs.getWorkQueue().schedule(new StateDumpWork(ncs, dsrf.getStateDumpId(), dsrf.getCcId()));
return;
case SHUTDOWN_REQUEST:
@@ -127,7 +130,7 @@
case THREAD_DUMP_REQUEST:
final CCNCFunctions.ThreadDumpRequestFunction tdrf = (CCNCFunctions.ThreadDumpRequestFunction) fn;
- ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId()));
+ ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId(), tdrf.getCcId()));
return;
default:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 18a6b20..24d72f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -29,12 +29,14 @@
import java.lang.management.ThreadMXBean;
import java.net.InetSocketAddress;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@@ -46,6 +48,7 @@
import org.apache.hyracks.api.application.INCApplication;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -90,6 +93,7 @@
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
import org.apache.hyracks.util.ExitUtil;
+import org.apache.hyracks.util.InvokeUtil;
import org.apache.hyracks.util.PidHelper;
import org.apache.hyracks.util.trace.ITracer;
import org.apache.hyracks.util.trace.Tracer;
@@ -128,7 +132,9 @@
private Exception registrationException;
- private IClusterController ccs;
+ private IClusterController primaryCcs;
+
+ private final Map<CcId, IClusterController> ccsMap = Collections.synchronizedMap(new HashMap<>());
private final Map<JobId, Joblet> jobletMap;
@@ -140,7 +146,9 @@
private NodeParameters nodeParameters;
- private Thread heartbeatThread;
+ private Map<IClusterController, Thread> heartbeatThreads = new ConcurrentHashMap<>();
+
+ private Map<IClusterController, Timer> ccTimers = new ConcurrentHashMap<>();
private final ServerContext serverCtx;
@@ -180,6 +188,8 @@
ExitUtil.init();
}
+ private NCShutdownHook ncShutdownHook;
+
public NodeControllerService(NCConfig config) throws Exception {
this(config, getApplication(config));
}
@@ -201,13 +211,14 @@
LOGGER.info("Setting uncaught exception handler " + getLifeCycleComponentManager());
}
// Set shutdown hook before so it doesn't have the same uncaught exception handler
- Runtime.getRuntime().addShutdownHook(new NCShutdownHook(this));
+ ncShutdownHook = new NCShutdownHook(this);
+ Runtime.getRuntime().addShutdownHook(ncShutdownHook);
Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager());
ioManager =
new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver());
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
- jobletMap = new Hashtable<>();
+ jobletMap = new ConcurrentHashMap<>();
deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
@@ -235,13 +246,6 @@
return lccm;
}
- synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
- this.nodeParameters = parameters;
- this.registrationException = exception;
- this.registrationPending = false;
- notifyAll();
- }
-
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<>();
synchronized (getNodeControllerInfosAcceptor) {
@@ -250,7 +254,7 @@
}
getNodeControllerInfosAcceptor.setValue(fv);
}
- ccs.getNodeControllerInfos();
+ primaryCcs.getNodeControllerInfos();
return fv.get();
}
@@ -297,79 +301,142 @@
if (messagingNetManager != null) {
messagingNetManager.start();
}
- this.ccs = new ClusterControllerRemoteProxy(
- ipc.getHandle(
- new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
- ncConfig.getClusterConnectRetries(), 1, new IIPCEventListener() {
- @Override
- public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
- // we need to re-register in case of NC -> CC connection reset
- try {
- registerNode();
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Failed Registering with cc", e);
- throw new IPCException(e);
- }
- }
- }));
- registerNode();
+
+ final InetSocketAddress ccAddress = new InetSocketAddress(ncConfig.getClusterAddress(),
+ ncConfig.getClusterPort());
+ this.primaryCcs = addCc(ncConfig.getClusterControllerId(), ccAddress);
workQueue.start();
// Schedule tracing a human-readable datetime
timer.schedule(new TraceCurrentTimeTask(serviceCtx.getTracer()), 0, 60000);
- if (nodeParameters.getProfileDumpPeriod() > 0) {
- // Schedule profile dump generator.
- timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
- }
-
- // Start heartbeat generator.
- heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()), id + "-Heartbeat");
- heartbeatThread.setPriority(Thread.MAX_PRIORITY);
- heartbeatThread.setDaemon(true);
- heartbeatThread.start();
-
LOGGER.log(Level.INFO, "Started NodeControllerService");
application.startupCompleted();
}
- public void registerNode() throws Exception {
- LOGGER.info("Registering with Cluster Controller");
+ public ClusterControllerRemoteProxy addCc(CcId ccId, InetSocketAddress ccAddress) throws Exception {
+ ClusterControllerRemoteProxy ccProxy;
+ synchronized (ccsMap) {
+ if (ccsMap.containsKey(ccId)) {
+ throw new IllegalStateException("cc already registered: " + ccId);
+ }
+ final IIPCEventListener ipcEventListener = new IIPCEventListener() {
+ @Override
+ public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
+ // we need to re-register in case of NC -> CC connection reset
+ try {
+ registerNode(ccsMap.get(ccId));
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Failed Registering with cc", e);
+ throw new IPCException(e);
+ }
+ }
+ };
+ ccProxy = new ClusterControllerRemoteProxy(ccId,
+ ipc.getHandle(ccAddress, ncConfig.getClusterConnectRetries(), 1, ipcEventListener));
+ registerNode(ccProxy);
+ ccsMap.put(ccId, ccProxy);
+ }
+ return ccProxy;
+ }
+
+ public void makePrimaryCc(CcId ccId) throws Exception {
+ synchronized (ccsMap) {
+ if (!ccsMap.containsKey(ccId)) {
+ throw new IllegalArgumentException("unknown cc: " + ccId);
+ }
+ primaryCcs = ccsMap.get(ccId);
+ }
+ }
+
+ public void removeCc(CcId ccId) throws Exception {
+ synchronized (ccsMap) {
+ final IClusterController ccs = ccsMap.get(ccId);
+ if (ccs == null) {
+ throw new IllegalArgumentException("unknown cc: " + ccId);
+ }
+ if (primaryCcs.equals(ccs)) {
+ throw new IllegalStateException("cannot remove primary cc: " + ccId);
+ }
+ // TODO(mblow): consider how to handle running jobs
+ ccs.unregisterNode(id);
+ Thread hbThread = heartbeatThreads.remove(ccs);
+ hbThread.interrupt();
+ Timer ccTimer = ccTimers.remove(ccs);
+ if (ccTimer != null) {
+ ccTimer.cancel();
+ }
+ }
+ }
+
+ protected void registerNode(IClusterController ccs) throws Exception {
+ LOGGER.info("Registering with Cluster Controller {}", ccs);
registrationPending = true;
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
- // Use "public" versions of network addresses and ports
+ // Use "public" versions of network addresses and ports, if defined
+ InetSocketAddress ncAddress;
+ if (ncConfig.getClusterPublicPort() == 0) {
+ ncAddress = ipc.getSocketAddress();
+ } else {
+ ncAddress = new InetSocketAddress(ncConfig.getClusterPublicAddress(), ncConfig.getClusterPublicPort());
+ }
NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
NetworkAddress netAddress = netManager.getPublicNetworkAddress();
- NetworkAddress meesagingPort =
+ NetworkAddress messagingAddress =
messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null;
int allCores = osMXBean.getAvailableProcessors();
- nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+ nodeRegistration = new NodeRegistration(ncAddress, id, ncConfig, netAddress, datasetAddress,
osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(),
runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(),
runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
- runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort, application.getCapacity(),
+ runtimeMXBean.getSystemProperties(), hbSchema, messagingAddress, application.getCapacity(),
PidHelper.getPid(), maxJobId.get());
ccs.registerNode(nodeRegistration);
- synchronized (this) {
- while (registrationPending) {
- wait();
- }
+ completeNodeRegistration(ccs);
+
+ // Start heartbeat generator.
+ if (!heartbeatThreads.containsKey(ccs)) {
+ Thread heartbeatThread = new Thread(new HeartbeatTask(ccs, nodeParameters.getHeartbeatPeriod()),
+ id + "-Heartbeat");
+ heartbeatThread.setPriority(Thread.MAX_PRIORITY);
+ heartbeatThread.setDaemon(true);
+ heartbeatThread.start();
+ heartbeatThreads.put(ccs, heartbeatThread);
+ }
+ if (!ccTimers.containsKey(ccs) && nodeParameters.getProfileDumpPeriod() > 0) {
+ Timer ccTimer = new Timer("Timer-" + ccs.getCcId(), true);
+ // Schedule profile dump generator.
+ ccTimer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+ ccTimers.put(ccs, ccTimer);
+ }
+
+ LOGGER.info("Registering with Cluster Controller {} complete", ccs);
+ }
+
+ synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+ this.nodeParameters = parameters;
+ this.registrationException = exception;
+ this.registrationPending = false;
+ notifyAll();
+ }
+
+ private synchronized void completeNodeRegistration(IClusterController ccs) throws Exception {
+ while (registrationPending) {
+ wait();
}
if (registrationException != null) {
- LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception",
- registrationException);
+ LOGGER.log(Level.WARN, "Registering with Cluster Controller failed with exception", registrationException);
throw registrationException;
}
serviceCtx.setDistributedState(nodeParameters.getDistributedState());
- application.onRegisterNode();
- LOGGER.info("Registering with Cluster Controller complete");
+ application.onRegisterNode(ccs.getCcId());
}
private void startApplication() throws Exception {
@@ -404,17 +471,21 @@
workQueue.stop();
application.stop();
/*
- * Stop heartbeat after NC has stopped to avoid false node failure detection
+ * Stop heartbeats only after NC has stopped to avoid false node failure detection
* on CC if an NC takes a long time to stop.
*/
- if (heartbeatThread != null) {
- heartbeatThread.interrupt();
- heartbeatThread.join(1000); // give it 1s to stop gracefully
- }
- try {
- ccs.notifyShutdown(id);
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
+ heartbeatThreads.values().parallelStream().forEach(t -> {
+ t.interrupt();
+ InvokeUtil.doUninterruptibly(() -> t.join(1000));
+ });
+ synchronized (ccsMap) {
+ ccsMap.values().parallelStream().forEach(ccs -> {
+ try {
+ ccs.notifyShutdown(id);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e);
+ }
+ });
}
ipc.stop();
@@ -423,6 +494,14 @@
LOGGER.log(Level.ERROR, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack),
new Exception("Duplicate shutdown call"));
}
+ if (ncShutdownHook != null) {
+ try {
+ Runtime.getRuntime().removeShutdownHook(ncShutdownHook);
+ LOGGER.info("removed shutdown hook for {}", id);
+ } catch (IllegalStateException e) {
+ LOGGER.log(Level.DEBUG, "ignoring exception while attempting to remove shutdown hook", e);
+ }
+ }
}
public String getId() {
@@ -488,8 +567,12 @@
return partitionManager;
}
- public IClusterController getClusterController() {
- return ccs;
+ public IClusterController getPrimaryClusterController() {
+ return primaryCcs;
+ }
+
+ public IClusterController getClusterController(CcId ccId) {
+ return ccsMap.get(ccId);
}
public NodeParameters getNodeParameters() {
@@ -619,7 +702,7 @@
public void run() {
try {
FutureValue<List<JobProfile>> fv = new FutureValue<>();
- BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
+ BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, cc.getCcId(), fv);
workQueue.scheduleAndSync(bjpw);
List<JobProfile> profiles = fv.get();
if (!profiles.isEmpty()) {
@@ -651,8 +734,8 @@
}
}
- public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception {
- ccs.sendApplicationMessageToCC(data, deploymentId, id);
+ public void sendApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
+ ccsMap.get(ccId).sendApplicationMessageToCC(data, deploymentId, id);
}
public IDatasetPartitionManager getDatasetPartitionManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 34ddd6a..07bb504 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -437,12 +437,13 @@
@Override
public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
- this.ncs.sendApplicationMessageToCC(message, deploymentId);
+ this.ncs.sendApplicationMessageToCC(getJobletContext().getJobId().getCcId(), message, deploymentId);
}
@Override
public void sendApplicationMessageToCC(Serializable message, DeploymentId deploymentId) throws Exception {
- this.ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), deploymentId);
+ this.ncs.sendApplicationMessageToCC(getJobletContext().getJobId().getCcId(),
+ JavaSerializationUtils.serialize(message), deploymentId);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 476aeae..fb7308e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -93,10 +93,10 @@
boolean orderedResult, boolean emptyResult) throws HyracksException {
try {
// Be sure to send the *public* network address to the CC
- ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
- partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress());
+ ncs.getClusterController(jobId.getCcId()).registerResultPartitionLocation(jobId, rsId, orderedResult,
+ emptyResult, partition, nPartitions, ncs.getDatasetNetworkManager().getPublicNetworkAddress());
} catch (Exception e) {
- throw new HyracksException(e);
+ throw HyracksException.create(e);
}
}
@@ -105,9 +105,9 @@
try {
LOGGER.debug("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
+ ":partition: " + partition);
- ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
+ ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId, rsId, partition);
} catch (Exception e) {
- throw new HyracksException(e);
+ throw HyracksException.create(e);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index b9d2f4d..4787a50 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -96,7 +96,7 @@
ctx.getIoManager().close(handle);
}
if (!failed) {
- manager.registerPartition(pid, taId,
+ manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId,
new MaterializedPartition(ctx, fRef, executor, ctx.getIoManager()),
PartitionState.COMMITTED, taId.getAttempt() == 0 ? false : true);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 57eba53..147606d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -188,7 +188,8 @@
eos = false;
failed = false;
deallocated = false;
- manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
+ manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId, this, PartitionState.STARTED,
+ false);
}
private void checkOrCreateFile() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
index 667cfa3..bb69eec 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PartitionManager.java
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -58,10 +59,10 @@
this.fileFactory = new WorkspaceFileFactory(deallocatableRegistry, ncs.getIoManager());
}
- public synchronized void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition,
+ public synchronized void registerPartition(PartitionId pid, CcId ccId, TaskAttemptId taId, IPartition partition,
PartitionState state, boolean updateToCC) throws HyracksDataException {
try {
- /**
+ /*
* process pending requests
*/
NetworkOutputChannel writer = partitionRequests.remove(pid);
@@ -73,24 +74,20 @@
}
}
- /**
+ /*
* put a coming available partition into the available partition map
*/
- List<IPartition> pList = availablePartitionMap.get(pid);
- if (pList == null) {
- pList = new ArrayList<>();
- availablePartitionMap.put(pid, pList);
- }
+ List<IPartition> pList = availablePartitionMap.computeIfAbsent(pid, k -> new ArrayList<>());
pList.add(partition);
- /**
+ /*
* update to CC only when necessary
*/
if (updateToCC) {
- updatePartitionState(pid, taId, partition, state);
+ updatePartitionState(ccId, pid, taId, partition, state);
}
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -128,7 +125,7 @@
partitionRequests.put(partitionId, writer);
}
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
@@ -140,14 +137,15 @@
deallocatableRegistry.close();
}
- public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+ public void updatePartitionState(CcId ccId, PartitionId pid, TaskAttemptId taId, IPartition partition,
+ PartitionState state)
throws HyracksDataException {
PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
desc.setState(state);
try {
- ncs.getClusterController().registerPartitionProvider(desc);
+ ncs.getClusterController(ccId).registerPartitionProvider(desc);
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
index 16e5027..fc2f8e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -71,7 +71,8 @@
@Override
public void open() throws HyracksDataException {
- manager.registerPartition(pid, taId, this, PartitionState.STARTED, false);
+ manager.registerPartition(pid, ctx.getJobletContext().getJobId().getCcId(), taId, this, PartitionState.STARTED,
+ false);
pendingConnection = true;
ensureConnected();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index f43dcbc..4969a85 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -18,8 +18,9 @@
*/
package org.apache.hyracks.control.nc.task;
-import org.apache.hyracks.util.ThreadDumpUtil;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -28,10 +29,12 @@
private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
private final String requestId;
+ private final CcId ccId;
- public ThreadDumpTask(NodeControllerService ncs, String requestId) {
+ public ThreadDumpTask(NodeControllerService ncs, String requestId, CcId ccId) {
this.ncs = ncs;
this.requestId = requestId;
+ this.ccId = ccId;
}
@Override
@@ -44,8 +47,7 @@
result = null;
}
try {
- ncs.getClusterController().notifyThreadDump(
- ncs.getContext().getNodeId(), requestId, result);
+ ncs.getClusterController(ccId).notifyThreadDump(ncs.getContext().getNodeId(), requestId, result);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Exception sending thread dump to CC", e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 6132639..68d677f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -20,6 +20,7 @@
import java.util.Collection;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.common.work.SynchronizableWork;
@@ -34,24 +35,29 @@
private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
+ private final CcId ccId;
- public AbortAllJobsWork(NodeControllerService ncs) {
+ public AbortAllJobsWork(NodeControllerService ncs, CcId ccId) {
this.ncs = ncs;
+ this.ccId = ccId;
}
@Override
protected void doRun() throws Exception {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Aborting all tasks");
- }
+ LOGGER.info("Aborting all tasks for controller {}", ccId);
IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
- if (dpm != null) {
- ncs.getDatasetPartitionManager().abortAllReaders();
- } else {
+ if (dpm == null) {
LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId());
}
Collection<Joblet> joblets = ncs.getJobletMap().values();
for (Joblet ji : joblets) {
+ // TODO(mblow): should we have one jobletmap per cc?
+ if (!ji.getJobId().getCcId().equals(ccId)) {
+ continue;
+ }
+ if (dpm != null) {
+ dpm.abortReader(ji.getJobId());
+ }
Collection<Task> tasks = ji.getTaskMap().values();
for (Task task : tasks) {
task.abort();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
index 582f058..0dd5d4e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/BuildJobProfilesWork.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
@@ -33,20 +34,21 @@
public class BuildJobProfilesWork extends SynchronizableWork {
private final NodeControllerService ncs;
+ private final CcId ccId;
private final FutureValue<List<JobProfile>> fv;
- public BuildJobProfilesWork(NodeControllerService ncs, FutureValue<List<JobProfile>> fv) {
+ public BuildJobProfilesWork(NodeControllerService ncs, CcId ccId, FutureValue<List<JobProfile>> fv) {
this.ncs = ncs;
+ this.ccId = ccId;
this.fv = fv;
}
@Override
protected void doRun() throws Exception {
- List<JobProfile> profiles = new ArrayList<JobProfile>();
+ List<JobProfile> profiles = new ArrayList<>();
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
- for (Joblet ji : jobletMap.values()) {
- profiles.add(new JobProfile(ji.getJobId()));
- }
+ jobletMap.values().stream().filter(ji -> ji.getJobId().getCcId().equals(ccId))
+ .forEach(ji -> profiles.add(new JobProfile(ji.getJobId())));
for (JobProfile jProfile : profiles) {
Joblet ji;
JobletProfile jobletProfile = new JobletProfile(ncs.getId());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
index 0fe55e6..d1385ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
@@ -22,6 +22,7 @@
import java.net.URL;
import java.util.List;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -40,11 +41,13 @@
private DeploymentId deploymentId;
private NodeControllerService ncs;
private List<URL> binaryURLs;
+ private final CcId ccId;
- public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs) {
+ public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
this.deploymentId = deploymentId;
this.ncs = ncs;
this.binaryURLs = binaryURLs;
+ this.ccId = ccId;
}
@Override
@@ -59,7 +62,7 @@
e.printStackTrace();
}
try {
- IClusterController ccs = ncs.getClusterController();
+ IClusterController ccs = ncs.getClusterController(ccId);
ccs.notifyDeployBinary(deploymentId, ncs.getId(), status);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
index 4276b67..92612dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployJobSpecWork.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.nc.work;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.DeployedJobSpecId;
@@ -34,12 +35,15 @@
private final NodeControllerService ncs;
private final byte[] acgBytes;
+ private final CcId ccId;
private final DeployedJobSpecId deployedJobSpecId;
- public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId, byte[] acgBytes) {
+ public DeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId, byte[] acgBytes,
+ CcId ccId) {
this.ncs = ncs;
this.deployedJobSpecId = deployedJobSpecId;
this.acgBytes = acgBytes;
+ this.ccId = ccId;
}
@Override
@@ -51,7 +55,7 @@
ncs.storeActivityClusterGraph(deployedJobSpecId, acg);
} catch (HyracksException e) {
try {
- ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, ncs.getId());
+ ncs.getClusterController(ccId).notifyDeployedJobSpecFailure(deployedJobSpecId, ncs.getId());
} catch (Exception e1) {
e1.printStackTrace();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index 449d9a3..614a9e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -41,8 +41,8 @@
TaskProfile taskProfile =
new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile(), task.getStatsCollector());
try {
- ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
- ncs.getId(), taskProfile);
+ ncs.getClusterController(task.getJobletContext().getJobId().getCcId()).notifyTaskComplete(
+ task.getJobletContext().getJobId(), task.getTaskAttemptId(), ncs.getId(), taskProfile);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Failed notifying task complete for " + task.getTaskAttemptId(), e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 1d6ae1b..f0b68a0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -56,7 +56,7 @@
if (dpm != null) {
dpm.abortReader(jobId);
}
- ncs.getClusterController().notifyTaskFailure(jobId, taskId, ncs.getId(), exceptions);
+ ncs.getClusterController(jobId.getCcId()).notifyTaskFailure(jobId, taskId, ncs.getId(), exceptions);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Failure reporting task failure to cluster controller", e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
index 9dbc901..0b3895f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StateDumpWork.java
@@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.control.common.work.SynchronizableWork;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -27,16 +28,19 @@
private final NodeControllerService ncs;
private final String stateDumpId;
+ private final CcId ccId;
- public StateDumpWork(NodeControllerService ncs, String stateDumpId) {
+ public StateDumpWork(NodeControllerService ncs, String stateDumpId, CcId ccId) {
this.ncs = ncs;
this.stateDumpId = stateDumpId;
+ this.ccId = ccId;
}
@Override
protected void doRun() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ncs.getContext().getStateDumpHandler().dumpState(baos);
- ncs.getClusterController().notifyStateDump(ncs.getContext().getNodeId(), stateDumpId, baos.toString("UTF-8"));
+ ncs.getClusterController(ccId).notifyStateDump(ncs.getContext().getNodeId(), stateDumpId,
+ baos.toString("UTF-8"));
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
index 1c10589..9b862b2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UnDeployBinaryWork.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.nc.work;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.deployment.DeploymentStatus;
@@ -35,10 +36,12 @@
private DeploymentId deploymentId;
private NodeControllerService ncs;
+ private final CcId ccId;
- public UnDeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId) {
+ public UnDeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, CcId ccId) {
this.deploymentId = deploymentId;
this.ncs = ncs;
+ this.ccId = ccId;
}
@Override
@@ -52,7 +55,7 @@
status = DeploymentStatus.FAIL;
}
try {
- IClusterController ccs = ncs.getClusterController();
+ IClusterController ccs = ncs.getClusterController(ccId);
ccs.notifyDeployBinary(deploymentId, ncs.getId(), status);
} catch (Exception e) {
throw new RuntimeException(e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
index 4383ff6..afc4206 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/UndeployJobSpecWork.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.nc.work;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.DeployedJobSpecId;
import org.apache.hyracks.control.common.work.AbstractWork;
@@ -32,10 +33,12 @@
private final NodeControllerService ncs;
private final DeployedJobSpecId deployedJobSpecId;
+ private final CcId ccId;
- public UndeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId) {
+ public UndeployJobSpecWork(NodeControllerService ncs, DeployedJobSpecId deployedJobSpecId, CcId ccId) {
this.ncs = ncs;
this.deployedJobSpecId = deployedJobSpecId;
+ this.ccId = ccId;
}
@Override
@@ -44,7 +47,7 @@
ncs.removeActivityClusterGraph(deployedJobSpecId);
} catch (HyracksException e) {
try {
- ncs.getClusterController().notifyDeployedJobSpecFailure(deployedJobSpecId, ncs.getId());
+ ncs.getClusterController(ccId).notifyDeployedJobSpecFailure(deployedJobSpecId, ncs.getId());
} catch (Exception e1) {
e1.printStackTrace();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
index 780a65c..15248e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -45,7 +46,7 @@
}
@Override
- public void onRegisterNode() throws Exception {
+ public void onRegisterNode(CcId ccs) throws Exception {
// No-op
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
index 478138e..2aeab81 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java
@@ -56,12 +56,6 @@
}
@Test
- public void waitForNonExistingJob() throws Exception {
- JobId jobId = new JobId(Long.MAX_VALUE);
- waitForCompletion(jobId, "has not been created yet");
- }
-
- @Test
public void failureOnInit() throws Exception {
JobSpecification spec = new JobSpecification();
connectToSinkAndRun(spec, new FailOnInitializeDeInitializeOperatorDescriptor(spec, true, false),
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
index e310385..1d38d3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java
@@ -30,7 +30,6 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatable;
-import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java
similarity index 98%
rename from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
rename to hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java
index 9bdf55c..b8d8ce4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InvokeUtil.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.utils;
+package org.apache.hyracks.util;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/trace/Tracer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/trace/Tracer.java
index 79922b7..5313514 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/trace/Tracer.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/trace/Tracer.java
@@ -38,7 +38,8 @@
protected static final Level TRACE_LOG_LEVEL = Level.INFO;
protected static final String CAT = "Tracer";
- protected static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+ protected static final ThreadLocal<DateFormat> DATE_FORMAT = ThreadLocal
+ .withInitial(() -> new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
protected final Logger traceLog;
protected long categories;
@@ -67,9 +68,7 @@
}
public static String dateTimeStamp() {
- synchronized (DATE_FORMAT) {
- return "{\"datetime\":\"" + DATE_FORMAT.format(new Date()) + "\"}";
- }
+ return "{\"datetime\":\"" + DATE_FORMAT.get().format(new Date()) + "\"}";
}
@Override