[NO ISSUE][RT] Abort CC jobs on first time registration
- user model changes: no
- storage format changes: no
- interface changes: yes
- application context is in charge of providing and renewing cc client
connection.
details:
- This change allows revival of cc if it gets killed.
- Jobs that were started by this cc are aborted and cleaned up on all ncs
upon first time registration.
- client connections are repaired on ncs when dead connection
is detected.
Change-Id: If755b7131bdc91790ed28be66f0c61b51f28c2fa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2026
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index c38b3fc..3912bd5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -24,6 +24,8 @@
import java.io.PrintWriter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IApplicationContext;
@@ -33,8 +35,10 @@
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.ipc.exceptions.IPCException;
public class AbstractQueryApiServlet extends AbstractServlet {
+ private static final Logger LOGGER = Logger.getLogger(AbstractQueryApiServlet.class.getName());
protected final IApplicationContext appCtx;
public enum ResultFields {
@@ -99,9 +103,19 @@
}
protected IHyracksDataset getHyracksDataset() throws Exception { // NOSONAR
- synchronized (ctx) {
- IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
- if (hds == null) {
+ try {
+ return doGetHyracksDataset();
+ } catch (IPCException e) {
+ LOGGER.log(Level.WARNING, "Failed getting hyracks dataset connection. Resetting hyracks connection.", e);
+ ctx.put(HYRACKS_CONNECTION_ATTR, appCtx.getHcc());
+ return doGetHyracksDataset();
+ }
+ }
+
+ protected IHyracksDataset doGetHyracksDataset() throws Exception {
+ IHyracksDataset hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
+ if (hds == null) {
+ synchronized (ctx) {
hds = (IHyracksDataset) ctx.get(HYRACKS_DATASET_ATTR);
if (hds == null) {
hds = new HyracksDataset(getHyracksClientConnection(),
@@ -109,18 +123,16 @@
ctx.put(HYRACKS_DATASET_ATTR, hds);
}
}
- return hds;
}
+ return hds;
}
protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
- synchronized (ctx) {
- final IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
- if (hcc == null) {
- throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
- }
- return hcc;
+ IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ if (hcc == null) {
+ throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
}
+ return hcc;
}
protected static UUID printRequestId(PrintWriter pw) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index da06dd1..616c22e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -36,6 +36,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -45,6 +46,7 @@
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -64,9 +66,9 @@
}
@Override
- protected void executeStatement(String statementsText,
- SessionOutput sessionOutput, IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats,
- RequestParameters param, long[] outExecStartEnd, Map<String, String> optionalParameters) throws Exception {
+ protected void executeStatement(String statementsText, SessionOutput sessionOutput,
+ IStatementExecutor.ResultDelivery delivery, IStatementExecutor.Stats stats, RequestParameters param,
+ long[] outExecStartEnd, Map<String, String> optionalParameters) throws Exception {
// Running on NC -> send 'execute' message to CC
INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker();
@@ -83,10 +85,9 @@
if (param.timeout != null) {
timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.timeout));
}
- ExecuteStatementRequestMessage requestMsg =
- new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage,
- statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl,
- optionalParameters);
+ ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
+ responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery,
+ param.clientContextID, handleUrl, optionalParameters);
outExecStartEnd[0] = System.nanoTime();
ncMb.sendMessageToCC(requestMsg);
try {
@@ -148,7 +149,8 @@
@Override
protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
- if (t instanceof IPCException || t instanceof TimeoutException) {
+ if (t instanceof TimeoutException
+ || (t instanceof HyracksDataException && ExceptionUtils.getRootCause(t) instanceof IPCException)) {
GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, t.toString(), t);
return HttpResponseStatus.SERVICE_UNAVAILABLE;
} else {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 901aff8..cce5099 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -61,7 +61,6 @@
IHyracksDataset hds = getHyracksDataset();
ResultReader resultReader = new ResultReader(hds, handle.getJobId(), handle.getResultSetId());
-
try {
DatasetJobRecord.Status status = resultReader.getStatus();
@@ -98,7 +97,7 @@
ResultUtil.printResults(appCtx, resultReader, sessionOutput, new Stats(), null);
} catch (HyracksDataException e) {
final int errorCode = e.getErrorCode();
- if (ErrorCode.NO_RESULTSET == errorCode) {
+ if (ErrorCode.NO_RESULT_SET == errorCode) {
LOGGER.log(Level.INFO, "No results for: \"" + strHandle + "\"");
response.setStatus(HttpResponseStatus.NOT_FOUND);
return;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index eedc8ec..3e0f2c6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -76,10 +76,14 @@
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -119,14 +123,11 @@
private IBufferCache bufferCache;
private ITransactionSubsystem txnSubsystem;
private IMetadataNode metadataNodeStub;
-
private ILSMIOOperationScheduler lsmIOScheduler;
private PersistentLocalResourceRepository localResourceRepository;
private IIOManager ioManager;
private boolean isShuttingdown;
-
private ActiveManager activeManager;
-
private IReplicationChannel replicationChannel;
private IReplicationManager replicationManager;
private IRemoteRecoveryManager remoteRecoveryManager;
@@ -134,6 +135,7 @@
private final ILibraryManager libraryManager;
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
+ private IHyracksClientConnection hcc;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -485,4 +487,22 @@
public INCServiceContext getServiceContext() {
return ncServiceContext;
}
+
+ @Override
+ public IHyracksClientConnection getHcc() throws HyracksDataException {
+ if (hcc == null || !hcc.isConnected()) {
+ synchronized (this) {
+ if (hcc == null || !hcc.isConnected()) {
+ try {
+ NodeControllerService ncSrv = (NodeControllerService) ncServiceContext.getControllerService();
+ ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
+ hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+ }
+ return hcc;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index e8f63b4..3d7f870 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -48,9 +48,6 @@
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
-import org.apache.hyracks.api.client.ClusterControllerInfo;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileDeviceResolver;
@@ -81,7 +78,7 @@
@Override
public void init(IServiceContext serviceCtx) throws Exception {
- this.ncServiceCtx = (INCServiceContext) serviceCtx;
+ ncServiceCtx = (INCServiceContext) serviceCtx;
ncServiceCtx.setThreadFactory(
new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager()));
}
@@ -103,7 +100,7 @@
System.setProperty("java.rmi.server.hostname",
(controllerService).getConfiguration().getClusterPublicAddress());
}
- runtimeContext = new NCAppRuntimeContext(this.ncServiceCtx, getExtensions());
+ runtimeContext = new NCAppRuntimeContext(ncServiceCtx, getExtensions());
MetadataProperties metadataProperties = runtimeContext.getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -115,8 +112,8 @@
MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
this.ncServiceCtx.setMessageBroker(messageBroker);
- MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory(
- (NCMessageBroker) messageBroker, messagingProperties);
+ MessagingChannelInterfaceFactory interfaceFactory =
+ new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -228,8 +225,8 @@
String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
.getStorageMountingPoints();
for (String ioDevice : ioDevices) {
- String tempDatasetsDir = ioDevice + storageDirName + File.separator
- + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
+ String tempDatasetsDir =
+ ioDevice + storageDirName + File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
File tmpDsDir = new File(tempDatasetsDir);
if (tmpDsDir.exists()) {
IoUtil.delete(tmpDsDir);
@@ -307,10 +304,4 @@
return devices.get(ioDeviceIndex);
};
}
-
- protected IHyracksClientConnection getHcc() throws Exception {
- NodeControllerService ncSrv = (NodeControllerService) ncServiceCtx.getControllerService();
- ClusterControllerInfo ccInfo = ncSrv.getNodeParameters().getClusterControllerInfo();
- return new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
- }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index e1840d3..4e30d54 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -30,6 +30,8 @@
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IApplicationContext {
@@ -56,7 +58,17 @@
/**
* @return the library manager which implements {@link org.apache.asterix.common.library.ILibraryManager}
*/
- public ILibraryManager getLibraryManager();
+ ILibraryManager getLibraryManager();
+ /**
+ * @return the service context
+ */
IServiceContext getServiceContext();
+
+ /**
+ * @return a connected instance of {@link IHyracksClientConnection}
+ * @throws HyracksDataException
+ * if connection couldn't be established to cluster controller
+ */
+ IHyracksClientConnection getHcc() throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 20f685a..690d1fd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.hyracks.api.application.ICCServiceContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.storage.common.IStorageManager;
@@ -71,11 +70,6 @@
IJobLifecycleListener getActiveNotificationHandler();
/**
- * @return a new instance of {@link IHyracksClientConnection}
- */
- IHyracksClientConnection getHcc();
-
- /**
* @return the cluster wide resource id manager
*/
IResourceIdManager getResourceIdManager();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index e4cc7f4..855031e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -45,7 +45,9 @@
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.asterix.runtime.transaction.ResourceIdManager;
import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.storage.common.IStorageManager;
@@ -155,7 +157,18 @@
}
@Override
- public IHyracksClientConnection getHcc() {
+ public IHyracksClientConnection getHcc() throws HyracksDataException {
+ if (!hcc.isConnected()) {
+ synchronized (this) {
+ if (!hcc.isConnected()) {
+ try {
+ hcc = new HyracksConnection(hcc.getHost(), hcc.getPort());
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+ }
return hcc;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 0142c7d..0ded84f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -70,8 +70,8 @@
@Override
public void cancelJob(JobId jobId) throws Exception {
- HyracksClientInterfaceFunctions.CancelJobFunction cjf = new HyracksClientInterfaceFunctions.CancelJobFunction(
- jobId);
+ HyracksClientInterfaceFunctions.CancelJobFunction cjf =
+ new HyracksClientInterfaceFunctions.CancelJobFunction(jobId);
rpci.call(ipcHandle, cjf);
}
@@ -84,8 +84,8 @@
@Override
public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
- HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
- deploymentId, acggfBytes, jobFlags);
+ HyracksClientInterfaceFunctions.StartJobFunction sjf =
+ new HyracksClientInterfaceFunctions.StartJobFunction(deploymentId, acggfBytes, jobFlags);
return (JobId) rpci.call(ipcHandle, sjf);
}
@@ -165,8 +165,8 @@
}
}
if (ipcHandle.isConnected()) {
- throw new IPCException("CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS
- + " seconds");
+ throw new IPCException(
+ "CC refused to release connection after " + SHUTDOWN_CONNECTION_TIMEOUT_SECS + " seconds");
}
}
@@ -181,6 +181,11 @@
public String getThreadDump(String node) throws Exception {
HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
new HyracksClientInterfaceFunctions.ThreadDumpFunction(node);
- return (String)rpci.call(ipcHandle, tdf);
+ return (String) rpci.call(ipcHandle, tdf);
+ }
+
+ @Override
+ public boolean isConnected() {
+ return ipcHandle.isConnected();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 75cbf61..e979da6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -44,7 +44,6 @@
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.util.JavaSerializationUtils;
-import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.RPCInterface;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
@@ -58,6 +57,8 @@
public final class HyracksConnection implements IHyracksClientConnection {
private final String ccHost;
+ private final int ccPort;
+
private final IPCSystem ipc;
private final IHyracksClientInterface hci;
@@ -77,11 +78,11 @@
*/
public HyracksConnection(String ccHost, int ccPort) throws Exception {
this.ccHost = ccHost;
+ this.ccPort = ccPort;
RPCInterface rpci = new RPCInterface();
ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
ipc.start();
- IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
- this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
+ hci = new HyracksClientInterfaceRemoteProxy(ipc.getHandle(new InetSocketAddress(ccHost, ccPort)), rpci);
ccInfo = hci.getClusterControllerInfo();
}
@@ -124,6 +125,7 @@
return hci.startJob(jobId);
}
+ @Override
public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
}
@@ -132,6 +134,7 @@
return hci.distributeJob(JavaSerializationUtils.serialize(acggf));
}
+ @Override
public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception {
return hci.getDatasetDirectoryServiceInfo();
}
@@ -242,4 +245,19 @@
public String getThreadDump(String node) throws Exception {
return hci.getThreadDump(node);
}
+
+ @Override
+ public String getHost() {
+ return ccHost;
+ }
+
+ @Override
+ public int getPort() {
+ return ccPort;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return hci.isConnected();
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 0956d85..0189135 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -44,7 +44,7 @@
* @return {@link JobStatus}
* @throws Exception
*/
- public JobStatus getJobStatus(JobId jobId) throws Exception;
+ JobStatus getJobStatus(JobId jobId) throws Exception;
/**
* Gets detailed information about the specified Job.
@@ -54,7 +54,7 @@
* @return {@link JobStatus}
* @throws Exception
*/
- public JobInfo getJobInfo(JobId jobId) throws Exception;
+ JobInfo getJobInfo(JobId jobId) throws Exception;
/**
* Cancel the job that has the given job id.
@@ -63,7 +63,7 @@
* the JobId of the Job
* @throws Exception
*/
- public void cancelJob(JobId jobId) throws Exception;
+ void cancelJob(JobId jobId) throws Exception;
/**
* Start the specified Job.
@@ -72,7 +72,7 @@
* Job Specification
* @throws Exception
*/
- public JobId startJob(JobSpecification jobSpec) throws Exception;
+ JobId startJob(JobSpecification jobSpec) throws Exception;
/**
* Start the specified Job.
@@ -83,7 +83,7 @@
* Flags
* @throws Exception
*/
- public JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+ JobId startJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
/**
* Distribute the specified Job.
@@ -94,7 +94,7 @@
* Flags
* @throws Exception
*/
- public JobId distributeJob(JobSpecification jobSpec) throws Exception;
+ JobId distributeJob(JobSpecification jobSpec) throws Exception;
/**
* Destroy the distributed graph for a pre-distributed job
@@ -103,7 +103,7 @@
* The id of the predistributed job
* @throws Exception
*/
- public JobId destroyJob(JobId jobId) throws Exception;
+ JobId destroyJob(JobId jobId) throws Exception;
/**
* Used to run a pre-distributed job by id (the same JobId will be returned)
@@ -112,7 +112,7 @@
* The id of the predistributed job
* @throws Exception
*/
- public JobId startJob(JobId jobId) throws Exception;
+ JobId startJob(JobId jobId) throws Exception;
/**
* Start the specified Job.
@@ -123,7 +123,7 @@
* Flags
* @throws Exception
*/
- public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
+ JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception;
/**
* Gets the IP Address and port for the DatasetDirectoryService wrapped in NetworkAddress
@@ -131,7 +131,7 @@
* @return {@link NetworkAddress}
* @throws Exception
*/
- public NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
+ NetworkAddress getDatasetDirectoryServiceInfo() throws Exception;
/**
* Waits until the specified job has completed, either successfully or has
@@ -141,8 +141,7 @@
* JobId of the Job
* @throws Exception
*/
- public void waitForCompletion(JobId jobId) throws Exception;
-
+ void waitForCompletion(JobId jobId) throws Exception;
/**
* Deploy the user-defined jars to the cluster
@@ -150,7 +149,7 @@
* @param jars
* a list of user-defined jars
*/
- public DeploymentId deployBinary(List<String> jars) throws Exception;
+ DeploymentId deployBinary(List<String> jars) throws Exception;
/**
* undeploy a certain deployment
@@ -158,7 +157,7 @@
* @param deploymentId
* the id for the deployment to be undeployed
*/
- public void unDeployBinary(DeploymentId deploymentId) throws Exception;
+ void unDeployBinary(DeploymentId deploymentId) throws Exception;
/**
* Start the specified Job.
@@ -169,7 +168,7 @@
* Job Specification
* @throws Exception
*/
- public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
+ JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec) throws Exception;
/**
* Start the specified Job.
@@ -182,8 +181,7 @@
* Flags
* @throws Exception
*/
- public JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags)
- throws Exception;
+ JobId startJob(DeploymentId deploymentId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
/**
* Start the specified Job.
@@ -196,27 +194,45 @@
* Flags
* @throws Exception
*/
- public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf,
- EnumSet<JobFlag> jobFlags) throws Exception;
+ JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+ throws Exception;
/**
* Shuts down all NCs and then the CC.
+ *
* @param terminateNCService
*/
- public void stopCluster(boolean terminateNCService) throws Exception;
+ void stopCluster(boolean terminateNCService) throws Exception;
/**
* Get details of specified node as JSON object
+ *
* @param nodeId
- * id the subject node
+ * id the subject node
* @param includeStats
- * @param includeConfig @return serialized JSON containing the node details
+ * @param includeConfig
+ * @return serialized JSON containing the node details
* @throws Exception
*/
- public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception;
+ String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception;
/**
* Gets thread dump from the specified node as a serialized JSON string
*/
- public String getThreadDump(String node) throws Exception;
+ String getThreadDump(String node) throws Exception;
+
+ /**
+ * @return true if the connection is alive, false otherwise
+ */
+ boolean isConnected();
+
+ /**
+ * @return the hostname of the cluster controller
+ */
+ String getHost();
+
+ /**
+ * @return the port of the cluster controller
+ */
+ int getPort();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 1afbe9e..9cebd3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -67,4 +67,6 @@
public String getNodeDetailsJSON(String nodeId, boolean includeStats, boolean includeConfig) throws Exception;
public String getThreadDump(String node) throws Exception;
+
+ public boolean isConnected();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 68e7cd1..4bb2869 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -57,7 +57,7 @@
public static final int ERROR_FINDING_DISTRIBUTED_JOB = 21;
public static final int DUPLICATE_DISTRIBUTED_JOB = 22;
public static final int DISTRIBUTED_JOB_FAILURE = 23;
- public static final int NO_RESULTSET = 24;
+ public static final int NO_RESULT_SET = 24;
public static final int JOB_CANCELED = 25;
public static final int NODE_FAILED = 26;
public static final int FILE_IS_NOT_DIRECTORY = 27;
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
index fdac7f1..31fd379 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/dataset/HyracksDatasetReader.java
@@ -93,7 +93,7 @@
try {
return datasetDirectoryServiceConnection.getDatasetResultStatus(jobId, resultSetId);
} catch (HyracksDataException e) {
- if (e.getErrorCode() != ErrorCode.NO_RESULTSET) {
+ if (e.getErrorCode() != ErrorCode.NO_RESULT_SET) {
LOGGER.log(Level.WARNING, "Exception retrieving result set for job " + jobId, e);
}
} catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 8400a59..7be6524 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -28,12 +28,12 @@
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.control.common.base.INodeController;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -41,7 +41,7 @@
public class NodeControllerState {
private static final int RRD_SIZE = 720;
- private final INodeController nodeController;
+ private final NodeControllerRemoteProxy nodeController;
private final NCConfig ncConfig;
@@ -145,7 +145,7 @@
private NodeCapacity capacity;
- public NodeControllerState(INodeController nodeController, NodeRegistration reg) {
+ public NodeControllerState(NodeControllerRemoteProxy nodeController, NodeRegistration reg) {
this.nodeController = nodeController;
ncConfig = reg.getNCConfig();
dataPort = reg.getDataPort();
@@ -251,7 +251,7 @@
return lastHeartbeatDuration++;
}
- public INodeController getNodeController() {
+ public NodeControllerRemoteProxy getNodeController() {
return nodeController;
}
@@ -279,7 +279,7 @@
return capacity;
}
- public synchronized ObjectNode toSummaryJSON() {
+ public synchronized ObjectNode toSummaryJSON() {
ObjectMapper om = new ObjectMapper();
ObjectNode o = om.createObjectNode();
o.put("node-id", ncConfig.getNodeId());
@@ -289,7 +289,7 @@
return o;
}
- public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig) {
+ public synchronized ObjectNode toDetailedJSON(boolean includeStats, boolean includeConfig) {
ObjectMapper om = new ObjectMapper();
ObjectNode o = om.createObjectNode();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 8cca1e0..2d43d42 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -35,6 +35,7 @@
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.resource.NodeCapacity;
@@ -44,6 +45,9 @@
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortCCJobsFunction;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
public class NodeManager implements INodeManager {
private static final Logger LOGGER = Logger.getLogger(NodeManager.class.getName());
@@ -93,6 +97,13 @@
LOGGER.warning(
"Node with name " + nodeId + " has already registered; failing the node then re-registering.");
removeDeadNode(nodeId);
+ } else {
+ try {
+ IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(ncState.getNodeController().getAddress());
+ ncIPCHandle.send(-1, new AbortCCJobsFunction(), null);
+ } catch (IPCException e) {
+ throw HyracksDataException.create(e);
+ }
}
LOGGER.warning("adding node to registry");
nodeRegistry.put(nodeId, ncState);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 7a9306c..ca1c91b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -107,7 +107,7 @@
private DatasetJobRecord getNonNullDatasetJobRecord(JobId jobId) throws HyracksDataException {
final DatasetJobRecord djr = getDatasetJobRecord(jobId);
if (djr == null) {
- throw HyracksDataException.create(ErrorCode.NO_RESULTSET, jobId);
+ throw HyracksDataException.create(ErrorCode.NO_RESULT_SET, jobId);
}
return djr;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index bf0846f..5866ba5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -28,7 +28,6 @@
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.base.INodeController;
import org.apache.hyracks.control.common.controllers.NodeParameters;
import org.apache.hyracks.control.common.controllers.NodeRegistration;
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
@@ -55,7 +54,8 @@
Map<IOption, Object> ncConfiguration = new HashMap<>();
try {
LOGGER.log(Level.WARNING, "Registering INodeController: id = " + id);
- INodeController nc = new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
+ NodeControllerRemoteProxy nc =
+ new NodeControllerRemoteProxy(ccs.getClusterIPC(), reg.getNodeControllerAddress());
NodeControllerState state = new NodeControllerState(nc, reg);
INodeManager nodeManager = ccs.getNodeManager();
nodeManager.addNode(id, state);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
index f51dd06..7117b6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
@@ -18,14 +18,24 @@
*/
package org.apache.hyracks.control.cc.work;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.work.AbstractWork;
+import org.apache.hyracks.control.common.work.NoOpCallback;
public class RegisterResultPartitionLocationWork extends AbstractWork {
+
+ private static final Logger LOGGER = Logger.getLogger(RegisterResultPartitionLocationWork.class.getName());
+
private final ClusterControllerService ccs;
private final JobId jobId;
@@ -43,8 +53,7 @@
private final NetworkAddress networkAddress;
public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
- boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress
- networkAddress) {
+ boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) {
this.ccs = ccs;
this.jobId = jobId;
this.rsId = rsId;
@@ -58,17 +67,24 @@
@Override
public void run() {
try {
- ccs.getDatasetDirectoryService()
- .registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, partition, nPartitions,
- networkAddress);
+ ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult,
+ partition, nPartitions, networkAddress);
} catch (HyracksDataException e) {
- throw new RuntimeException(e);
+ LOGGER.log(Level.WARNING, "Failed to register partition location", e);
+ // Should fail the job if exists on cc, otherwise, do nothing
+ JobRun jobRun = ccs.getJobManager().get(jobId);
+ if (jobRun != null) {
+ List<Exception> exceptions = new ArrayList<>();
+ exceptions.add(e);
+ jobRun.getExecutor().abortJob(exceptions, NoOpCallback.INSTANCE);
+ }
}
}
@Override
public String toString() {
- return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
- + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult + " EmptyResult@" + emptyResult;
+ return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@"
+ + nPartitions + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult
+ + " EmptyResult@" + emptyResult;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index fa6580e..9b9a3b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -27,13 +27,19 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
import org.apache.hyracks.control.cc.scheduler.ResourceManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
public class NodeManagerTest {
@@ -43,9 +49,9 @@
private static final String NODE2 = "node2";
@Test
- public void testNormal() throws HyracksException {
+ public void testNormal() throws HyracksException, IPCException {
IResourceManager resourceManager = new ResourceManager();
- INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
+ INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
@@ -68,9 +74,9 @@
}
@Test
- public void testException() throws HyracksException {
+ public void testException() throws HyracksException, IPCException {
IResourceManager resourceManager = new ResourceManager();
- INodeManager nodeManager = new NodeManager(null, makeCCConfig(), resourceManager);
+ INodeManager nodeManager = new NodeManager(mockCcs(), makeCCConfig(), resourceManager);
NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
boolean invalidNetworkAddress = false;
@@ -86,6 +92,16 @@
verifyEmptyCluster(resourceManager, nodeManager);
}
+ private ClusterControllerService mockCcs() throws IPCException {
+ ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class);
+ IPCSystem ipcSystem = Mockito.mock(IPCSystem.class);
+ IIPCHandle ipcHandle = Mockito.mock(IIPCHandle.class);
+ Mockito.when(ccs.getClusterIPC()).thenReturn(ipcSystem);
+ Mockito.when(ipcSystem.getHandle(Mockito.any())).thenReturn(ipcHandle);
+ Mockito.when(ipcSystem.getHandle(Mockito.any(), Mockito.anyInt())).thenReturn(ipcHandle);
+ return ccs;
+ }
+
@Test
public void testNullNode() throws HyracksException {
IResourceManager resourceManager = new ResourceManager();
@@ -112,6 +128,7 @@
private NodeControllerState mockNodeControllerState(String nodeId, boolean invalidIpAddr) {
NodeControllerState ncState = mock(NodeControllerState.class);
+ NodeControllerRemoteProxy ncProxy = Mockito.mock(NodeControllerRemoteProxy.class);
String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
NetworkAddress resultAddr = new NetworkAddress(ipAddr, 1002);
@@ -123,6 +140,7 @@
NCConfig ncConfig = new NCConfig(nodeId);
ncConfig.setDataPublicAddress(ipAddr);
when(ncState.getNCConfig()).thenReturn(ncConfig);
+ Mockito.when(ncState.getNodeController()).thenReturn(ncProxy);
return ncState;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 0d46d64..251aed8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -45,8 +45,8 @@
import org.apache.hyracks.control.cc.application.CCServiceContext;
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.cc.cluster.NodeManager;
-import org.apache.hyracks.control.common.base.INodeController;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
import org.apache.hyracks.control.common.logs.LogFile;
import org.apache.hyracks.control.common.work.NoOpCallback;
import org.junit.Assert;
@@ -293,7 +293,7 @@
private INodeManager mockNodeManager() {
INodeManager nodeManager = mock(NodeManager.class);
NodeControllerState ncState = mock(NodeControllerState.class);
- INodeController nodeController = mock(INodeController.class);
+ NodeControllerRemoteProxy nodeController = mock(NodeControllerRemoteProxy.class);
when(nodeManager.getNodeControllerState(any())).thenReturn(ncState);
when(ncState.getNodeController()).thenReturn(nodeController);
return nodeManager;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index d42c4a8..4797ed7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -87,6 +87,7 @@
NODE_REGISTRATION_RESULT,
START_TASKS,
ABORT_TASKS,
+ ABORT_ALL_JOBS,
CLEANUP_JOBLET,
REPORT_PARTITION_AVAILABILITY,
SEND_APPLICATION_MESSAGE,
@@ -665,6 +666,16 @@
}
}
+ //TODO: Add CC id to this job to only abort jobs by this CC: https://issues.apache.org/jira/browse/ASTERIXDB-2110
+ public static class AbortCCJobsFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.ABORT_ALL_JOBS;
+ }
+ }
+
public static class DistributeJobFunction extends Function {
private static final long serialVersionUID = 1L;
@@ -782,7 +793,7 @@
// read task attempt descriptors
int tadSize = dis.readInt();
- List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<TaskAttemptDescriptor>();
+ List<TaskAttemptDescriptor> taskDescriptors = new ArrayList<>();
for (int i = 0; i < tadSize; i++) {
TaskAttemptDescriptor tad = TaskAttemptDescriptor.create(dis);
taskDescriptors.add(tad);
@@ -790,7 +801,7 @@
//read connector policies
int cpSize = dis.readInt();
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<>();
for (int i = 0; i < cpSize; i++) {
ConnectorDescriptorId cid = ConnectorDescriptorId.create(dis);
IConnectorPolicy policy = ConnectorPolicyFactory.INSTANCE.getConnectorPolicy(dis);
@@ -1362,8 +1373,8 @@
int cdid = dis.readInt();
int senderIndex = dis.readInt();
int receiverIndex = dis.readInt();
- PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex,
- receiverIndex);
+ PartitionId pid =
+ new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
return pid;
}
@@ -1379,8 +1390,8 @@
int aid = dis.readInt();
int partition = dis.readInt();
int attempt = dis.readInt();
- TaskAttemptId taId = new TaskAttemptId(
- new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt);
+ TaskAttemptId taId =
+ new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid), partition), attempt);
return taId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
index 2815ae1..d4ccbd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ControllerRemoteProxy.java
@@ -21,13 +21,14 @@
import java.net.InetSocketAddress;
import java.util.logging.Logger;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
public abstract class ControllerRemoteProxy {
protected final IPCSystem ipc;
- protected final InetSocketAddress inetSocketAddress;
+ private final InetSocketAddress inetSocketAddress;
private final IControllerRemoteProxyIPCEventListener eventListener;
private IIPCHandle ipcHandle;
@@ -36,28 +37,33 @@
}
protected ControllerRemoteProxy(IPCSystem ipc, InetSocketAddress inetSocketAddress,
- IControllerRemoteProxyIPCEventListener eventListener) {
+ IControllerRemoteProxyIPCEventListener eventListener) {
this.ipc = ipc;
this.inetSocketAddress = inetSocketAddress;
- this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {} : eventListener;
+ this.eventListener = eventListener == null ? new IControllerRemoteProxyIPCEventListener() {
+ } : eventListener;
}
- protected IIPCHandle ensureIpcHandle() throws IPCException {
- final boolean first = ipcHandle == null;
- if (first || !ipcHandle.isConnected()) {
- if (!first) {
- getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
- eventListener.ipcHandleDisconnected(ipcHandle);
- }
- ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
- if (ipcHandle.isConnected()) {
- if (first) {
- eventListener.ipcHandleConnected(ipcHandle);
- } else {
- getLogger().warning("ipcHandle " + ipcHandle + " restored");
- eventListener.ipcHandleRestored(ipcHandle);
+ protected IIPCHandle ensureIpcHandle() throws HyracksDataException {
+ try {
+ final boolean first = ipcHandle == null;
+ if (first || !ipcHandle.isConnected()) {
+ if (!first) {
+ getLogger().warning("ipcHandle " + ipcHandle + " disconnected; retrying connection");
+ eventListener.ipcHandleDisconnected(ipcHandle);
+ }
+ ipcHandle = ipc.getHandle(inetSocketAddress, getRetries(first));
+ if (ipcHandle.isConnected()) {
+ if (first) {
+ eventListener.ipcHandleConnected(ipcHandle);
+ } else {
+ getLogger().warning("ipcHandle " + ipcHandle + " restored");
+ eventListener.ipcHandleRestored(ipcHandle);
+ }
}
}
+ } catch (IPCException e) {
+ throw HyracksDataException.create(e);
}
return ipcHandle;
}
@@ -65,4 +71,8 @@
protected abstract int getRetries(boolean first);
protected abstract Logger getLogger();
+
+ public InetSocketAddress getAddress() {
+ return inetSocketAddress;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index c416942..1eb1393 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -22,6 +22,7 @@
import org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
import org.apache.hyracks.control.nc.task.ShutdownTask;
import org.apache.hyracks.control.nc.task.ThreadDumpTask;
+import org.apache.hyracks.control.nc.work.AbortAllJobsWork;
import org.apache.hyracks.control.nc.work.AbortTasksWork;
import org.apache.hyracks.control.nc.work.ApplicationMessageWork;
import org.apache.hyracks.control.nc.work.CleanupJobletWork;
@@ -55,10 +56,9 @@
CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
switch (fn.getFunctionId()) {
case SEND_APPLICATION_MESSAGE:
- CCNCFunctions.SendApplicationMessageFunction amf =
- (CCNCFunctions.SendApplicationMessageFunction) fn;
- ncs.getWorkQueue().schedule(new ApplicationMessageWork(ncs, amf.getMessage(),
- amf.getDeploymentId(), amf.getNodeId()));
+ CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+ ncs.getWorkQueue().schedule(
+ new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId()));
return;
case START_TASKS:
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
@@ -69,6 +69,9 @@
CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
ncs.getWorkQueue().schedule(new AbortTasksWork(ncs, atf.getJobId(), atf.getTasks()));
return;
+ case ABORT_ALL_JOBS:
+ ncs.getWorkQueue().schedule(new AbortAllJobsWork(ncs));
+ return;
case CLEANUP_JOBLET:
CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, cjf.getJobId(), cjf.getStatus()));
@@ -76,8 +79,8 @@
case REPORT_PARTITION_AVAILABILITY:
CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
(CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
- ncs.getWorkQueue().schedule(new ReportPartitionAvailabilityWork(ncs,
- rpaf.getPartitionId(), rpaf.getNetworkAddress()));
+ ncs.getWorkQueue().schedule(
+ new ReportPartitionAvailabilityWork(ncs, rpaf.getPartitionId(), rpaf.getNetworkAddress()));
return;
case NODE_REGISTRATION_RESULT:
CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
@@ -92,8 +95,7 @@
case DEPLOY_BINARY:
CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
- ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(),
- dbf.getBinaryURLs()));
+ ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs()));
return;
case UNDEPLOY_BINARY:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b0a702d..f4ec6e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -125,7 +125,7 @@
private final Map<JobId, Joblet> jobletMap;
- private final Map<JobId, ActivityClusterGraph> preDistributedJobActivityClusterGraphMap;
+ private final Map<JobId, ActivityClusterGraph> preDistributedJobs;
private ExecutorService executor;
@@ -199,7 +199,7 @@
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<>();
- preDistributedJobActivityClusterGraphMap = new Hashtable<>();
+ preDistributedJobs = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
new File(new File(NodeControllerService.class.getName()), id));
@@ -418,27 +418,27 @@
}
public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException {
- if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
+ if (preDistributedJobs.get(jobId) != null) {
throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
}
- preDistributedJobActivityClusterGraphMap.put(jobId, acg);
+ preDistributedJobs.put(jobId, acg);
}
public void removeActivityClusterGraph(JobId jobId) throws HyracksException {
- if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) {
+ if (preDistributedJobs.get(jobId) == null) {
throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
}
- preDistributedJobActivityClusterGraphMap.remove(jobId);
+ preDistributedJobs.remove(jobId);
}
public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException {
- if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
+ if (preDistributedJobs.get(jobId) != null) {
throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId);
}
}
public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException {
- return preDistributedJobActivityClusterGraphMap.get(jobId);
+ return preDistributedJobs.get(jobId);
}
public NetworkManager getNetworkManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
similarity index 72%
rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 4fb4bf6..56100da 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllTasksWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -18,23 +18,23 @@
*/
package org.apache.hyracks.control.nc.work;
-import java.util.Map;
+import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.common.work.SynchronizableWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
-public class AbortAllTasksWork extends SynchronizableWork {
+public class AbortAllJobsWork extends SynchronizableWork {
- private static final Logger LOGGER = Logger.getLogger(AbortAllTasksWork.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(AbortAllJobsWork.class.getName());
private final NodeControllerService ncs;
- public AbortAllTasksWork(NodeControllerService ncs) {
+ public AbortAllJobsWork(NodeControllerService ncs) {
this.ncs = ncs;
}
@@ -46,14 +46,16 @@
IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
if (dpm != null) {
ncs.getDatasetPartitionManager().abortAllReaders();
+ } else {
+ LOGGER.log(Level.WARNING, "DatasetPartitionManager is null on " + ncs.getId());
}
- for (Joblet ji : ncs.getJobletMap().values()) {
- Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
- for (Task task : taskMap.values()) {
- if (task != null) {
- task.abort();
- }
+ Collection<Joblet> joblets = ncs.getJobletMap().values();
+ for (Joblet ji : joblets) {
+ Collection<Task> tasks = ji.getTaskMap().values();
+ for (Task task : tasks) {
+ task.abort();
}
+ ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, ji.getJobId(), JobStatus.FAILURE));
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index 4a01fdb..caba5f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -23,6 +23,7 @@
import static org.mockito.Mockito.verify;
import java.io.File;
+import java.lang.reflect.Field;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,6 +33,8 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.cluster.NodeManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -69,9 +72,17 @@
ccRoot.mkdir();
ccConfig.setRootDir(ccRoot.getAbsolutePath());
ClusterControllerService ccBase = new ClusterControllerService(ccConfig);
+ // The spying below is dangerous since it replaces the ClusterControllerService already referenced by many
+ // objects created in the constructor above
cc = Mockito.spy(ccBase);
cc.start();
+ // The following code partially fixes the problem created by the spying
+ INodeManager nodeManager = cc.getNodeManager();
+ Field ccsInNodeManager = NodeManager.class.getDeclaredField("ccs");
+ ccsInNodeManager.setAccessible(true);
+ ccsInNodeManager.set(nodeManager, cc);
+
NCConfig ncConfig1 = new NCConfig(NC1_ID);
ncConfig1.setClusterAddress("localhost");
ncConfig1.setClusterPort(39001);
@@ -79,7 +90,7 @@
ncConfig1.setDataListenAddress("127.0.0.1");
ncConfig1.setResultListenAddress("127.0.0.1");
ncConfig1.setResultSweepThreshold(5000);
- ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
+ ncConfig1.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
nc1 = Mockito.spy(nc1Base);
nc1.start();
@@ -91,7 +102,7 @@
ncConfig2.setDataListenAddress("127.0.0.1");
ncConfig2.setResultListenAddress("127.0.0.1");
ncConfig2.setResultSweepThreshold(5000);
- ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
+ ncConfig2.setIODevices(new String[] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
nc2 = Mockito.spy(nc2Base);
nc2.start();
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
index aebe2f5..ba1c9a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
@@ -27,7 +27,7 @@
private final Map<Long, Request> reqMap;
public RPCInterface() {
- reqMap = new HashMap<Long, RPCInterface.Request>();
+ reqMap = new HashMap<>();
}
public Object call(IIPCHandle handle, Object request) throws Exception {