Changes required to support MessageBroker implementation
This change includes the following:
- API changes to support MessageBroker implementation.
- IResourceIdFactory interface to support application dependent implementation.
Change-Id: Ib9f49234eebe912c48e7f71980433a9b42595741
Reviewed-on: https://asterix-gerrit.ics.uci.edu/485
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
index 29401a1..1fb2291 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
import org.apache.hyracks.api.messages.IMessageBroker;
+import org.apache.hyracks.api.service.IControllerService;
/**
* Base class of the {@link ICCApplicationContext} and the {@link INCApplicationContext}.
@@ -48,4 +49,9 @@
public void setThreadFactory(ThreadFactory threadFactory);
+ /**
+ * @return The controller service which the application context belongs to.
+ */
+ public IControllerService getControllerService();
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 6023323..fd1d376 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -40,5 +40,5 @@
public IDatasetPartitionManager getDatasetPartitionManager();
- public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId, String nodeId) throws Exception;
+ public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
index 40a1baa..2b166d2 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
@@ -23,6 +23,6 @@
*/
public interface IMessageBroker {
- public void receivedMessage(IMessage message, String nodeId);
+ public void receivedMessage(IMessage message, String nodeId) throws Exception;
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
similarity index 91%
rename from hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java
rename to hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
index d6a4a45..c67ec45 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.common.service;
+package org.apache.hyracks.api.service;
-public interface IService {
+public interface IControllerService {
public void start() throws Exception;
public void stop() throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 1c27376..e8b2c27 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -47,6 +47,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobInfo;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
import org.apache.hyracks.control.cc.application.CCApplicationContext;
@@ -84,7 +85,6 @@
import org.apache.hyracks.control.cc.work.TaskFailureWork;
import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
-import org.apache.hyracks.control.common.AbstractRemoteService;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.deployment.DeploymentRun;
@@ -104,7 +104,7 @@
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
import org.xml.sax.InputSource;
-public class ClusterControllerService extends AbstractRemoteService {
+public class ClusterControllerService implements IControllerService {
private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
private final CCConfig ccConfig;
@@ -252,14 +252,14 @@
}
private void startApplication() throws Exception {
- appCtx = new CCApplicationContext(serverCtx, ccContext);
+ appCtx = new CCApplicationContext(this, serverCtx, ccContext);
appCtx.addJobLifecycleListener(datasetDirectoryService);
String className = ccConfig.appCCMainClass;
if (className != null) {
Class<?> c = Class.forName(className);
aep = (ICCApplicationEntryPoint) c.newInstance();
- String[] args = ccConfig.appArgs == null ? null : ccConfig.appArgs.toArray(new String[ccConfig.appArgs
- .size()]);
+ String[] args = ccConfig.appArgs == null ? null
+ : ccConfig.appArgs.toArray(new String[ccConfig.appArgs.size()]);
aep.start(appCtx, args);
}
executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
@@ -359,7 +359,8 @@
private class HyracksClientInterfaceIPCI implements IIPCI {
@Override
- public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload,
+ Exception exception) {
HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
switch (fn.getFunctionId()) {
case GET_CLUSTER_CONTROLLER_INFO: {
@@ -388,8 +389,8 @@
case START_JOB: {
HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = createJobId();
- workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(), sjf
- .getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
+ workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
+ sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
return;
}
@@ -401,15 +402,15 @@
case GET_DATASET_RESULT_STATUS: {
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
- workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf
- .getResultSetId(), new IPCResponder<Status>(handle, mid)));
+ workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
+ gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid)));
return;
}
case GET_DATASET_RESULT_LOCATIONS: {
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
- workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
- .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
+ workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this,
+ gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
return;
}
@@ -438,8 +439,8 @@
case CLI_DEPLOY_BINARY: {
HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
- workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(), dbf
- .getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
+ workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
+ dbf.getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid)));
return;
}
@@ -483,22 +484,22 @@
case NODE_HEARTBEAT: {
CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
- workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(), nhf
- .getHeartbeatData()));
+ workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(),
+ nhf.getHeartbeatData()));
return;
}
case NOTIFY_JOBLET_CLEANUP: {
CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
- workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this,
- njcf.getJobId(), njcf.getNodeId()));
+ workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this, njcf.getJobId(),
+ njcf.getNodeId()));
return;
}
case NOTIFY_DEPLOY_BINARY: {
CCNCFunctions.NotifyDeployBinaryFunction ndbf = (CCNCFunctions.NotifyDeployBinaryFunction) fn;
- workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this,
- ndbf.getDeploymentId(), ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+ workQueue.schedule(new NotifyDeployBinaryWork(ClusterControllerService.this, ndbf.getDeploymentId(),
+ ndbf.getNodeId(), ndbf.getDeploymentStatus()));
return;
}
@@ -510,35 +511,35 @@
case NOTIFY_TASK_COMPLETE: {
CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
- workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(), ntcf
- .getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
+ workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(),
+ ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
return;
}
case NOTIFY_TASK_FAILURE: {
CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
- workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
- .getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
+ workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(),
+ ntff.getTaskId(), ntff.getNodeId(), ntff.getExceptions()));
return;
}
case REGISTER_PARTITION_PROVIDER: {
CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
- workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, rppf
- .getPartitionDescriptor()));
+ workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this,
+ rppf.getPartitionDescriptor()));
return;
}
case REGISTER_PARTITION_REQUEST: {
CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
- workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, rprf
- .getPartitionRequest()));
+ workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this,
+ rprf.getPartitionRequest()));
return;
}
case REGISTER_RESULT_PARTITION_LOCATION: {
CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
- workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
- .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+ workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this,
+ rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(),
rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress()));
return;
}
@@ -552,15 +553,15 @@
case REPORT_RESULT_PARTITION_FAILURE: {
CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
- workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this, rrplf
- .getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
+ workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this,
+ rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
return;
}
case SEND_APPLICATION_MESSAGE: {
CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
- workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(), rsf
- .getDeploymentId(), rsf.getNodeId()));
+ workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
+ rsf.getDeploymentId(), rsf.getNodeId()));
return;
}
@@ -637,10 +638,11 @@
deploymentRunMap.remove(deploymentKey);
}
- public synchronized void setShutdownRun(ShutdownRun sRun){
+ public synchronized void setShutdownRun(ShutdownRun sRun) {
shutdownCallback = sRun;
}
- public synchronized ShutdownRun getShutdownRun(){
+
+ public synchronized ShutdownRun getShutdownRun() {
return shutdownCallback;
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index 988b678..a4f569b 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -33,6 +33,8 @@
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.application.ApplicationContext;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -48,10 +50,13 @@
private List<IJobLifecycleListener> jobLifecycleListeners;
private List<IClusterLifecycleListener> clusterLifecycleListeners;
+ private final ClusterControllerService ccs;
- public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext) throws IOException {
+ public CCApplicationContext(ClusterControllerService ccs, ServerContext serverCtx, ICCContext ccContext)
+ throws IOException {
super(serverCtx);
this.ccContext = ccContext;
+ this.ccs = ccs;
initPendingNodeIds = new HashSet<String>();
deinitPendingNodeIds = new HashSet<String>();
jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
@@ -107,4 +112,9 @@
l.notifyNodeFailure(deadNodeIds);
}
}
+
+ @Override
+ public IControllerService getControllerService() {
+ return ccs;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 8718e4f..40a83a2 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -38,7 +38,8 @@
private String nodeId;
private ClusterControllerService ccs;
- public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId, String nodeId) {
+ public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId,
+ String nodeId) {
super(ccs, nodeId, null);
this.ccs = ccs;
this.deploymentId = deploymentId;
@@ -54,7 +55,11 @@
ccs.getExecutor().execute(new Runnable() {
@Override
public void run() {
- ctx.getMessageBroker().receivedMessage(data, nodeId);
+ try {
+ ctx.getMessageBroker().receivedMessage(data, nodeId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
});
} catch (Exception e) {
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/AbstractRemoteService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/AbstractRemoteService.java
deleted file mode 100644
index 2f63416..0000000
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/AbstractRemoteService.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common;
-
-import org.apache.hyracks.control.common.service.IService;
-
-public abstract class AbstractRemoteService implements IService {
- public AbstractRemoteService() {
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index ec58b63..490faad 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -36,8 +36,8 @@
public interface INodeController {
public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception;
+ List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
+ EnumSet<JobFlag> flags) throws Exception;
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
@@ -52,4 +52,6 @@
public void dumpState(String stateDumpId) throws Exception;
public void shutDown() throws Exception;
+
+ public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 331d9ef..22ee6b3 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -45,8 +45,8 @@
@Override
public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception {
+ List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies,
+ EnumSet<JobFlag> flags) throws Exception {
CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(deploymentId, jobId, planBytes,
taskDescriptors, connectorPolicies, flags);
ipcHandle.send(-1, stf, null);
@@ -94,4 +94,11 @@
CCNCFunctions.ShutdownRequestFunction sdrf = new CCNCFunctions.ShutdownRequestFunction();
ipcHandle.send(-1, sdrf, null);
}
+
+ @Override
+ public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
+ CCNCFunctions.SendApplicationMessageFunction fn = new CCNCFunctions.SendApplicationMessageFunction(data,
+ deploymentId, nodeId);
+ ipcHandle.send(-1, fn, null);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/AbstractService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/AbstractService.java
deleted file mode 100644
index d2d0243..0000000
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/AbstractService.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.service;
-
-public abstract class AbstractService implements IService {
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 1a50211..7b5758c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -53,7 +53,7 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
-import org.apache.hyracks.control.common.AbstractRemoteService;
+import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.common.base.IClusterController;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -93,7 +93,7 @@
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-public class NodeControllerService extends AbstractRemoteService {
+public class NodeControllerService implements IControllerService {
private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
private static final double MEMORY_FUDGE_FACTOR = 0.8;
@@ -316,7 +316,7 @@
}
private void startApplication() throws Exception {
- appCtx = new NCApplicationContext(serverCtx, ctx, id, memoryManager, lccm);
+ appCtx = new NCApplicationContext(this, serverCtx, ctx, id, memoryManager, lccm);
String className = ncConfig.appNCMainClass;
if (className != null) {
Class<?> c = Class.forName(className);
@@ -558,8 +558,8 @@
}
}
- public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
- ccs.sendApplicationMessageToCC(data, deploymentId, nodeId);
+ public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId) throws Exception {
+ ccs.sendApplicationMessageToCC(data, deploymentId, id);
}
public IDatasetPartitionManager getDatasetPartitionManager() {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 9a3582a..12df264 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -380,7 +380,7 @@
}
@Override
- public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId, String nodeId) throws Exception {
- this.ncs.sendApplicationMessageToCC(message, deploymentId, nodeId);
+ public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
+ this.ncs.sendApplicationMessageToCC(message, deploymentId);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
index 0a2aaec..e262738 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
@@ -27,8 +27,10 @@
import org.apache.hyracks.api.context.IHyracksRootContext;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.resources.memory.IMemoryManager;
+import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.common.application.ApplicationContext;
import org.apache.hyracks.control.common.context.ServerContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
@@ -38,14 +40,17 @@
private final MemoryManager memoryManager;
private Object appObject;
private IStateDumpHandler sdh;
+ private final NodeControllerService ncs;
- public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String nodeId,
- MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager) throws IOException {
+ public NCApplicationContext(NodeControllerService ncs, ServerContext serverCtx, IHyracksRootContext rootCtx,
+ String nodeId, MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager)
+ throws IOException {
super(serverCtx);
this.lccm = lifeCyclecomponentManager;
this.nodeId = nodeId;
this.rootCtx = rootCtx;
this.memoryManager = memoryManager;
+ this.ncs = ncs;
sdh = new IStateDumpHandler() {
@Override
@@ -97,4 +102,9 @@
public IMemoryManager getMemoryManager() {
return memoryManager;
}
+
+ @Override
+ public IControllerService getControllerService() {
+ return ncs;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 2b6b208..c6680bc 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -31,8 +31,8 @@
import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
import org.apache.hyracks.storage.common.file.ILocalResourceFactory;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import org.apache.hyracks.storage.common.file.LocalResource;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
public abstract class IndexDataflowHelper implements IIndexDataflowHelper {
@@ -40,7 +40,7 @@
protected final IHyracksTaskContext ctx;
protected final IIndexLifecycleManager lcManager;
protected final ILocalResourceRepository localResourceRepository;
- protected final ResourceIdFactory resourceIdFactory;
+ protected final IResourceIdFactory resourceIdFactory;
protected final FileReference file;
protected final int partition;
protected final int ioDeviceId;
@@ -57,8 +57,9 @@
this.resourceIdFactory = opDesc.getStorageManager().getResourceIdFactory(ctx);
this.partition = partition;
this.ioDeviceId = opDesc.getFileSplitProvider().getFileSplits()[partition].getIODeviceId();
- this.file = new FileReference(new File(IndexFileNameUtil.prepareFileName(opDesc.getFileSplitProvider()
- .getFileSplits()[partition].getLocalFile().getFile().getPath(), ioDeviceId)));
+ this.file = new FileReference(new File(IndexFileNameUtil.prepareFileName(
+ opDesc.getFileSplitProvider().getFileSplits()[partition].getLocalFile().getFile().getPath(),
+ ioDeviceId)));
this.durable = durable;
this.resourceName = file.getFile().getPath();
}
@@ -92,8 +93,8 @@
resourceID = resourceIdFactory.createId();
ILocalResourceFactory localResourceFactory = opDesc.getLocalResourceFactoryProvider()
.getLocalResourceFactory();
- localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, resourceName,
- partition));
+ localResourceRepository
+ .insert(localResourceFactory.createLocalResource(resourceID, resourceName, partition));
} catch (IOException e) {
throw new HyracksDataException(e);
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManagerInterface.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManagerInterface.java
index 8d74ce3..7a68051 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManagerInterface.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManagerInterface.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.file.IFileMapProvider;
import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
public interface IStorageManagerInterface extends Serializable {
public IBufferCache getBufferCache(IHyracksTaskContext ctx);
@@ -33,5 +33,5 @@
public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx);
- public ResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx);
+ public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx);
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
similarity index 72%
copy from hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java
copy to hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
index d6a4a45..42fd6d4 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/service/IService.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
@@ -16,10 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.hyracks.control.common.service;
+package org.apache.hyracks.storage.common.file;
-public interface IService {
- public void start() throws Exception;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
- public void stop() throws Exception;
+public interface IResourceIdFactory {
+
+ /**
+ * @return A unique monotonically increasing id.
+ * @throws Exception
+ */
+ long createId() throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/ResourceIdFactory.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/ResourceIdFactory.java
index 0adf998..625d247 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/ResourceIdFactory.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/ResourceIdFactory.java
@@ -20,13 +20,14 @@
import java.util.concurrent.atomic.AtomicLong;
-public class ResourceIdFactory {
+public class ResourceIdFactory implements IResourceIdFactory {
private AtomicLong id = null;
public ResourceIdFactory(long initialValue) {
id = new AtomicLong(initialValue);
}
+ @Override
public long createId() {
return id.getAndIncrement();
}
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
index b2a5765..f619be0 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.api.resources.memory.IMemoryManager;
+import org.apache.hyracks.api.service.IControllerService;
public class TestNCApplicationContext implements INCApplicationContext {
private final ILifeCycleComponentManager lccm;
@@ -128,4 +129,9 @@
@Override
public void setStateDumpHandler(IStateDumpHandler handler) {
}
+
+ @Override
+ public IControllerService getControllerService() {
+ return null;
+ }
}
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index d8cf599..6d954eb 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -28,7 +28,6 @@
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
@@ -41,7 +40,7 @@
private final TaskAttemptId taskId;
private WorkspaceFileFactory fileFactory;
- public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) throws HyracksException {
+ public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) {
this.jobletContext = jobletContext;
this.taskId = taskId;
fileFactory = new WorkspaceFileFactory(this, (IOManager) getIOManager());
@@ -130,8 +129,7 @@
}
@Override
- public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId, String nodeId) throws Exception {
- // TODO Auto-generated method stub
+ public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymentId) throws Exception {
}
@@ -139,4 +137,4 @@
public ExecutorService getExecutorService() {
return null;
}
-}
+}
\ No newline at end of file