Added ability to use other ser-deser techniques than just java serialization for IPC. Added custom ser-deser for high-traffic messages
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1049 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index b474ee1..8c9409c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.RPCInterface;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
/**
* Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
@@ -65,7 +66,7 @@
public HyracksConnection(String ccHost, int ccPort) throws Exception {
this.ccHost = ccHost;
RPCInterface rpci = new RPCInterface();
- ipc = new IPCSystem(new InetSocketAddress(0), rpci);
+ ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
ipc.start();
IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
index 1424a63..703f74b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -15,20 +15,16 @@
package edu.uci.ics.hyracks.api.client;
import java.io.Serializable;
-import java.net.InetAddress;
public class NodeControllerInfo implements Serializable {
private static final long serialVersionUID = 1L;
private final String nodeId;
- private final InetAddress ipAddress;
-
private final NodeStatus status;
- public NodeControllerInfo(String nodeId, InetAddress ipAddress, NodeStatus status) {
+ public NodeControllerInfo(String nodeId, NodeStatus status) {
this.nodeId = nodeId;
- this.ipAddress = ipAddress;
this.status = status;
}
@@ -36,10 +32,6 @@
return nodeId;
}
- public InetAddress getIpAddress() {
- return ipAddress;
- }
-
public NodeStatus getStatus() {
return status;
}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
index 868221d..d176c3c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -15,21 +15,20 @@
package edu.uci.ics.hyracks.api.comm;
import java.io.Serializable;
-import java.net.InetAddress;
public final class NetworkAddress implements Serializable {
private static final long serialVersionUID = 1L;
- private final InetAddress ipAddress;
+ private final byte[] ipAddress;
private final int port;
- public NetworkAddress(InetAddress ipAddress, int port) {
+ public NetworkAddress(byte[] ipAddress, int port) {
this.ipAddress = ipAddress;
this.port = port;
}
- public InetAddress getIpAddress() {
+ public byte[] getIpAddress() {
return ipAddress;
}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index 75cc245..41b4c23 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -19,9 +19,9 @@
public final class ActivityId implements Serializable {
private static final long serialVersionUID = 1L;
private final OperatorDescriptorId odId;
- private final long id;
+ private final int id;
- public ActivityId(OperatorDescriptorId odId, long id) {
+ public ActivityId(OperatorDescriptorId odId, int id) {
this.odId = odId;
this.id = id;
}
@@ -30,7 +30,7 @@
return odId;
}
- public long getLocalId() {
+ public int getLocalId() {
return id;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 2578b88..d3e472b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -60,8 +60,8 @@
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
import edu.uci.ics.hyracks.control.common.logs.LogFile;
import edu.uci.ics.hyracks.control.common.work.IPCResponder;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -69,6 +69,7 @@
import edu.uci.ics.hyracks.ipc.api.IIPCI;
import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
public class ClusterControllerService extends AbstractRemoteService {
private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
@@ -119,9 +120,11 @@
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
executor = Executors.newCachedThreadPool();
IIPCI ccIPCI = new ClusterControllerIPCI();
- clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI);
+ clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+ new CCNCFunctions.SerializerDeserializer());
IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
- clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI);
+ clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this);
activeRunMap = new HashMap<JobId, JobRun>();
runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -317,69 +320,69 @@
private class ClusterControllerIPCI implements IIPCI {
@Override
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
- ClusterControllerFunctions.Function fn = (Function) payload;
+ CCNCFunctions.Function fn = (Function) payload;
switch (fn.getFunctionId()) {
case REGISTER_NODE: {
- ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+ CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
return;
}
case UNREGISTER_NODE: {
- ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+ CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
return;
}
case NODE_HEARTBEAT: {
- ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+ CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(), nhf
.getHeartbeatData()));
return;
}
case NOTIFY_JOBLET_CLEANUP: {
- ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+ CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this,
njcf.getJobId(), njcf.getNodeId()));
return;
}
case REPORT_PROFILE: {
- ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+ CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
return;
}
case NOTIFY_TASK_COMPLETE: {
- ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+ CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(), ntcf
.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
return;
}
case NOTIFY_TASK_FAILURE: {
- ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+ CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
.getTaskId(), ntff.getDetails(), ntff.getDetails()));
return;
}
case REGISTER_PARTITION_PROVIDER: {
- ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+ CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, rppf
.getPartitionDescriptor()));
return;
}
case REGISTER_PARTITION_REQUEST: {
- ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+ CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, rprf
.getPartitionRequest()));
return;
}
case APPLICATION_STATE_CHANGE_RESPONSE: {
- ClusterControllerFunctions.ApplicationStateChangeResponseFunction astrf = (ClusterControllerFunctions.ApplicationStateChangeResponseFunction) fn;
+ CCNCFunctions.ApplicationStateChangeResponseFunction astrf = (CCNCFunctions.ApplicationStateChangeResponseFunction) fn;
workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
return;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
index 2d8ae85..f6271fe 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
@@ -6,7 +6,7 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.common.work.IResultCallback;
@@ -14,10 +14,10 @@
private static final Logger LOGGER = Logger.getLogger(ApplicationStateChangeWork.class.getName());
private final ClusterControllerService ccs;
- private final ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf;
+ private final CCNCFunctions.ApplicationStateChangeResponseFunction ascrf;
public ApplicationStateChangeWork(ClusterControllerService ccs,
- ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf) {
+ CCNCFunctions.ApplicationStateChangeResponseFunction ascrf) {
this.ccs = ccs;
this.ascrf = ascrf;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 49a3be9..9e8d130 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,8 +39,7 @@
Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
- result.put(e.getKey(), new NodeControllerInfo(e.getKey(), e.getValue().getDataPort().getIpAddress(),
- NodeStatus.ALIVE));
+ result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE));
}
callback.setValue(result);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index cba4b4b5..e4e0aae 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -25,7 +25,7 @@
import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -46,7 +46,7 @@
String id = reg.getNodeId();
IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
- NodeControllerFunctions.NodeRegistrationResult result = null;
+ CCNCFunctions.NodeRegistrationResult result = null;
try {
INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
@@ -69,9 +69,9 @@
params.setClusterControllerInfo(ccs.getClusterControllerInfo());
params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
- result = new NodeControllerFunctions.NodeRegistrationResult(params, null);
+ result = new CCNCFunctions.NodeRegistrationResult(params, null);
} catch (Exception e) {
- result = new NodeControllerFunctions.NodeRegistrationResult(null, e);
+ result = new CCNCFunctions.NodeRegistrationResult(null, e);
}
ncIPCHandle.send(-1, result, null);
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
new file mode 100644
index 0000000..9ea4636
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -0,0 +1,792 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+public class CCNCFunctions {
+ private static final int FID_CODE_SIZE = 1;
+
+ public enum FunctionId {
+ REGISTER_NODE,
+ UNREGISTER_NODE,
+ NOTIFY_JOBLET_CLEANUP,
+ NOTIFY_TASK_COMPLETE,
+ NOTIFY_TASK_FAILURE,
+ NODE_HEARTBEAT,
+ REPORT_PROFILE,
+ REGISTER_PARTITION_PROVIDER,
+ REGISTER_PARTITION_REQUEST,
+ APPLICATION_STATE_CHANGE_RESPONSE,
+
+ NODE_REGISTRATION_RESULT,
+ START_TASKS,
+ ABORT_TASKS,
+ CLEANUP_JOBLET,
+ CREATE_APPLICATION,
+ DESTROY_APPLICATION,
+ REPORT_PARTITION_AVAILABILITY,
+
+ OTHER
+ }
+
+ public static abstract class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class RegisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeRegistration reg;
+
+ public RegisterNodeFunction(NodeRegistration reg) {
+ this.reg = reg;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_NODE;
+ }
+
+ public NodeRegistration getNodeRegistration() {
+ return reg;
+ }
+ }
+
+ public static class UnregisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ public UnregisterNodeFunction(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.UNREGISTER_NODE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NotifyTaskCompleteFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final TaskProfile statistics;
+
+ public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_COMPLETE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public TaskProfile getStatistics() {
+ return statistics;
+ }
+ }
+
+ public static class NotifyTaskFailureFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final String details;
+
+ public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.details = details;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_FAILURE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getDetails() {
+ return details;
+ }
+ }
+
+ public static class NotifyJobletCleanupFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final String nodeId;
+
+ public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_JOBLET_CLEANUP;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NodeHeartbeatFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final HeartbeatData hbData;
+
+ public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+ this.nodeId = nodeId;
+ this.hbData = hbData;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_HEARTBEAT;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public HeartbeatData getHeartbeatData() {
+ return hbData;
+ }
+ }
+
+ public static class ReportProfileFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final List<JobProfile> profiles;
+
+ public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+ this.nodeId = nodeId;
+ this.profiles = profiles;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PROFILE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public List<JobProfile> getProfiles() {
+ return profiles;
+ }
+ }
+
+ public static class RegisterPartitionProviderFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionDescriptor partitionDescriptor;
+
+ public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+ this.partitionDescriptor = partitionDescriptor;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_PROVIDER;
+ }
+
+ public PartitionDescriptor getPartitionDescriptor() {
+ return partitionDescriptor;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ // Read PartitionId
+ PartitionId pid = readPartitionId(dis);
+
+ // Read nodeId
+ String nodeId = dis.readUTF();
+
+ // Read TaskAttemptId
+ TaskAttemptId taId = readTaskAttemptId(dis);
+
+ // Read reusable flag
+ boolean reusable = dis.readBoolean();
+
+ // Read Partition State
+ PartitionState state = readPartitionState(dis);
+
+ PartitionDescriptor pd = new PartitionDescriptor(pid, nodeId, taId, reusable);
+ pd.setState(state);
+ return new RegisterPartitionProviderFunction(pd);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ RegisterPartitionProviderFunction fn = (RegisterPartitionProviderFunction) object;
+
+ DataOutputStream dos = new DataOutputStream(out);
+
+ PartitionDescriptor pd = fn.getPartitionDescriptor();
+
+ // Write PartitionId
+ writePartitionId(dos, pd.getPartitionId());
+
+ // Write nodeId
+ dos.writeUTF(pd.getNodeId());
+
+ // Write TaskAttemptId
+ writeTaskAttemptId(dos, pd.getProducingTaskAttemptId());
+
+ // Write reusable flag
+ dos.writeBoolean(pd.isReusable());
+
+ // Write Partition State
+ writePartitionState(dos, pd.getState());
+ }
+ }
+
+ public static class RegisterPartitionRequestFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionRequest partitionRequest;
+
+ public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+ this.partitionRequest = partitionRequest;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_REQUEST;
+ }
+
+ public PartitionRequest getPartitionRequest() {
+ return partitionRequest;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ // Read PartitionId
+ PartitionId pid = readPartitionId(dis);
+
+ // Read nodeId
+ String nodeId = dis.readUTF();
+
+ // Read TaskAttemptId
+ TaskAttemptId taId = readTaskAttemptId(dis);
+
+ // Read Partition State
+ PartitionState state = readPartitionState(dis);
+
+ PartitionRequest pr = new PartitionRequest(pid, nodeId, taId, state);
+ return new RegisterPartitionRequestFunction(pr);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ RegisterPartitionRequestFunction fn = (RegisterPartitionRequestFunction) object;
+
+ DataOutputStream dos = new DataOutputStream(out);
+
+ PartitionRequest pr = fn.getPartitionRequest();
+
+ // Write PartitionId
+ writePartitionId(dos, pr.getPartitionId());
+
+ // Write nodeId
+ dos.writeUTF(pr.getNodeId());
+
+ // Write TaskAttemptId
+ writeTaskAttemptId(dos, pr.getRequestingTaskAttemptId());
+
+ // Write Partition State
+ writePartitionState(dos, pr.getMinimumState());
+ }
+ }
+
+ public static class ApplicationStateChangeResponseFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final String appName;
+ private final ApplicationStatus status;
+
+ public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
+ this.nodeId = nodeId;
+ this.appName = appName;
+ this.status = status;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getApplicationName() {
+ return appName;
+ }
+
+ public ApplicationStatus getStatus() {
+ return status;
+ }
+ }
+
+ public static class NodeRegistrationResult extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeParameters params;
+
+ private final Exception exception;
+
+ public NodeRegistrationResult(NodeParameters params, Exception exception) {
+ this.params = params;
+ this.exception = exception;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_REGISTRATION_RESULT;
+ }
+
+ public NodeParameters getNodeParameters() {
+ return params;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+ }
+
+ public static class StartTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final JobId jobId;
+ private final byte[] planBytes;
+ private final List<TaskAttemptDescriptor> taskDescriptors;
+ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
+ public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+ this.appName = appName;
+ this.jobId = jobId;
+ this.planBytes = planBytes;
+ this.taskDescriptors = taskDescriptors;
+ this.connectorPolicies = connectorPolicies;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_TASKS;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public byte[] getPlanBytes() {
+ return planBytes;
+ }
+
+ public List<TaskAttemptDescriptor> getTaskDescriptors() {
+ return taskDescriptors;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+ return connectorPolicies;
+ }
+ }
+
+ public static class AbortTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final List<TaskAttemptId> tasks;
+
+ public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+ this.jobId = jobId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.ABORT_TASKS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public List<TaskAttemptId> getTasks() {
+ return tasks;
+ }
+ }
+
+ public static class CleanupJobletFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final JobStatus status;
+
+ public CleanupJobletFunction(JobId jobId, JobStatus status) {
+ this.jobId = jobId;
+ this.status = status;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CLEANUP_JOBLET;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+ }
+
+ public static class CreateApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final boolean deployHar;
+ private final byte[] serializedDistributedState;
+
+ public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+ this.appName = appName;
+ this.deployHar = deployHar;
+ this.serializedDistributedState = serializedDistributedState;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public boolean isDeployHar() {
+ return deployHar;
+ }
+
+ public byte[] getSerializedDistributedState() {
+ return serializedDistributedState;
+ }
+ }
+
+ public static class DestroyApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public DestroyApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class ReportPartitionAvailabilityFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+ private final NetworkAddress networkAddress;
+
+ public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+ this.pid = pid;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PARTITION_AVAILABILITY;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ // Read PartitionId
+ PartitionId pid = readPartitionId(dis);
+
+ // Read NetworkAddress
+ NetworkAddress networkAddress = readNetworkAddress(dis);
+
+ return new ReportPartitionAvailabilityFunction(pid, networkAddress);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ ReportPartitionAvailabilityFunction fn = (ReportPartitionAvailabilityFunction) object;
+
+ DataOutputStream dos = new DataOutputStream(out);
+
+ // Write PartitionId
+ writePartitionId(dos, fn.getPartitionId());
+
+ // Write NetworkAddress
+ writeNetworkAddress(dos, fn.getNetworkAddress());
+ }
+ }
+
+ public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
+ private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
+
+ public SerializerDeserializer() {
+ javaSerde = new JavaSerializationBasedPayloadSerializerDeserializer();
+ }
+
+ @Override
+ public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+ if (length < FID_CODE_SIZE) {
+ throw new IllegalStateException("Message size too small: " + length);
+ }
+ byte fid = buffer.get();
+ return deserialize(fid, buffer, length - FID_CODE_SIZE);
+ }
+
+ @Override
+ public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+ if (length < FID_CODE_SIZE) {
+ throw new IllegalStateException("Message size too small: " + length);
+ }
+ byte fid = buffer.get();
+ if (fid != FunctionId.OTHER.ordinal()) {
+ throw new IllegalStateException("Expected FID for OTHER, found: " + fid);
+ }
+ return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE);
+ }
+
+ @Override
+ public byte[] serializeObject(Object object) throws Exception {
+ if (object instanceof Function) {
+ Function fn = (Function) object;
+ return serialize(object, (byte) fn.getFunctionId().ordinal());
+ } else {
+ return serialize(object, (byte) FunctionId.OTHER.ordinal());
+ }
+ }
+
+ @Override
+ public byte[] serializeException(Exception object) throws Exception {
+ return serialize(object, (byte) FunctionId.OTHER.ordinal());
+ }
+
+ private byte[] serialize(Object object, byte fid) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(fid);
+ serialize(baos, object, fid);
+ JavaSerializationBasedPayloadSerializerDeserializer.serialize(baos, object);
+ baos.close();
+ return baos.toByteArray();
+ }
+
+ private void serialize(OutputStream out, Object object, byte fid) throws Exception {
+ switch (FunctionId.values()[fid]) {
+ case REGISTER_PARTITION_PROVIDER:
+ RegisterPartitionProviderFunction.serialize(out, object);
+ return;
+
+ case REGISTER_PARTITION_REQUEST:
+ RegisterPartitionRequestFunction.serialize(out, object);
+ return;
+
+ case REPORT_PARTITION_AVAILABILITY:
+ ReportPartitionAvailabilityFunction.serialize(out, object);
+ return;
+ }
+ JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
+ }
+
+ private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception {
+ switch (FunctionId.values()[fid]) {
+ case REGISTER_PARTITION_PROVIDER:
+ return RegisterPartitionProviderFunction.deserialize(buffer, length);
+
+ case REGISTER_PARTITION_REQUEST:
+ return RegisterPartitionRequestFunction.deserialize(buffer, length);
+
+ case REPORT_PARTITION_AVAILABILITY:
+ return ReportPartitionAvailabilityFunction.deserialize(buffer, length);
+ }
+
+ return javaSerde.deserializeObject(buffer, length);
+ }
+ }
+
+ private static PartitionId readPartitionId(DataInputStream dis) throws IOException {
+ long jobId = dis.readLong();
+ int cdid = dis.readInt();
+ int senderIndex = dis.readInt();
+ int receiverIndex = dis.readInt();
+ PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+ return pid;
+ }
+
+ private static void writePartitionId(DataOutputStream dos, PartitionId pid) throws IOException {
+ dos.writeLong(pid.getJobId().getId());
+ dos.writeInt(pid.getConnectorDescriptorId().getId());
+ dos.writeInt(pid.getSenderIndex());
+ dos.writeInt(pid.getReceiverIndex());
+ }
+
+ private static TaskAttemptId readTaskAttemptId(DataInputStream dis) throws IOException {
+ int odid = dis.readInt();
+ int aid = dis.readInt();
+ int partition = dis.readInt();
+ int attempt = dis.readInt();
+ TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid),
+ partition), attempt);
+ return taId;
+ }
+
+ private static void writeTaskAttemptId(DataOutputStream dos, TaskAttemptId taId) throws IOException {
+ TaskId tid = taId.getTaskId();
+ ActivityId aid = tid.getActivityId();
+ OperatorDescriptorId odId = aid.getOperatorDescriptorId();
+ dos.writeInt(odId.getId());
+ dos.writeInt(aid.getLocalId());
+ dos.writeInt(tid.getPartition());
+ dos.writeInt(taId.getAttempt());
+ }
+
+ private static PartitionState readPartitionState(DataInputStream dis) throws IOException {
+ PartitionState state = PartitionState.values()[dis.readInt()];
+ return state;
+ }
+
+ private static void writePartitionState(DataOutputStream dos, PartitionState state) throws IOException {
+ dos.writeInt(state.ordinal());
+ }
+
+ private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException {
+ int bLen = dis.readInt();
+ byte[] ipAddress = new byte[bLen];
+ dis.read(ipAddress);
+ int port = dis.readInt();
+ NetworkAddress networkAddress = new NetworkAddress(ipAddress, port);
+ return networkAddress;
+ }
+
+ private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException {
+ byte[] ipAddress = networkAddress.getIpAddress();
+ dos.writeInt(ipAddress.length);
+ dos.write(ipAddress);
+ dos.writeInt(networkAddress.getPort());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
deleted file mode 100644
index 2b6792e..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import java.io.Serializable;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
-import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
-
-public class ClusterControllerFunctions {
- public enum FunctionId {
- REGISTER_NODE,
- UNREGISTER_NODE,
- NOTIFY_TASK_COMPLETE,
- NOTIFY_TASK_FAILURE,
- NOTIFY_JOBLET_CLEANUP,
- NODE_HEARTBEAT,
- REPORT_PROFILE,
- REGISTER_PARTITION_PROVIDER,
- REGISTER_PARTITION_REQUEST,
- APPLICATION_STATE_CHANGE_RESPONSE,
- }
-
- public static abstract class Function implements Serializable {
- private static final long serialVersionUID = 1L;
-
- public abstract FunctionId getFunctionId();
- }
-
- public static class RegisterNodeFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final NodeRegistration reg;
-
- public RegisterNodeFunction(NodeRegistration reg) {
- this.reg = reg;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REGISTER_NODE;
- }
-
- public NodeRegistration getNodeRegistration() {
- return reg;
- }
- }
-
- public static class UnregisterNodeFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String nodeId;
-
- public UnregisterNodeFunction(String nodeId) {
- this.nodeId = nodeId;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.UNREGISTER_NODE;
- }
-
- public String getNodeId() {
- return nodeId;
- }
- }
-
- public static class NotifyTaskCompleteFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final TaskAttemptId taskId;
- private final String nodeId;
- private final TaskProfile statistics;
-
- public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
- this.jobId = jobId;
- this.taskId = taskId;
- this.nodeId = nodeId;
- this.statistics = statistics;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NOTIFY_TASK_COMPLETE;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public TaskAttemptId getTaskId() {
- return taskId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public TaskProfile getStatistics() {
- return statistics;
- }
- }
-
- public static class NotifyTaskFailureFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final TaskAttemptId taskId;
- private final String nodeId;
- private final String details;
-
- public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
- this.jobId = jobId;
- this.taskId = taskId;
- this.nodeId = nodeId;
- this.details = details;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NOTIFY_TASK_FAILURE;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public TaskAttemptId getTaskId() {
- return taskId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public String getDetails() {
- return details;
- }
- }
-
- public static class NotifyJobletCleanupFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final String nodeId;
-
- public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
- this.jobId = jobId;
- this.nodeId = nodeId;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NOTIFY_JOBLET_CLEANUP;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
- }
-
- public static class NodeHeartbeatFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String nodeId;
- private final HeartbeatData hbData;
-
- public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
- this.nodeId = nodeId;
- this.hbData = hbData;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NODE_HEARTBEAT;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public HeartbeatData getHeartbeatData() {
- return hbData;
- }
- }
-
- public static class ReportProfileFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String nodeId;
- private final List<JobProfile> profiles;
-
- public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
- this.nodeId = nodeId;
- this.profiles = profiles;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REPORT_PROFILE;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public List<JobProfile> getProfiles() {
- return profiles;
- }
- }
-
- public static class RegisterPartitionProviderFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final PartitionDescriptor partitionDescriptor;
-
- public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
- this.partitionDescriptor = partitionDescriptor;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REGISTER_PARTITION_PROVIDER;
- }
-
- public PartitionDescriptor getPartitionDescriptor() {
- return partitionDescriptor;
- }
- }
-
- public static class RegisterPartitionRequestFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final PartitionRequest partitionRequest;
-
- public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
- this.partitionRequest = partitionRequest;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REGISTER_PARTITION_REQUEST;
- }
-
- public PartitionRequest getPartitionRequest() {
- return partitionRequest;
- }
- }
-
- public static class ApplicationStateChangeResponseFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String nodeId;
- private final String appName;
- private final ApplicationStatus status;
-
- public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
- this.nodeId = nodeId;
- this.appName = appName;
- this.status = status;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public String getApplicationName() {
- return appName;
- }
-
- public ApplicationStatus getStatus() {
- return status;
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index b7dd0af..a0dabdd 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -37,13 +37,13 @@
@Override
public void registerNode(NodeRegistration reg) throws Exception {
- ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
+ CCNCFunctions.RegisterNodeFunction fn = new CCNCFunctions.RegisterNodeFunction(reg);
ipcHandle.send(-1, fn, null);
}
@Override
public void unregisterNode(String nodeId) throws Exception {
- ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
+ CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(
nodeId);
ipcHandle.send(-1, fn, null);
}
@@ -51,56 +51,56 @@
@Override
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
- ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
+ CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(
jobId, taskId, nodeId, statistics);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
- ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
+ CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(
jobId, taskId, nodeId, details);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
- ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
+ CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(
jobId, nodeId);
ipcHandle.send(-1, fn, null);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
- ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
+ CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id,
hbData);
ipcHandle.send(-1, fn, null);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
- ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
+ CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id,
profiles);
ipcHandle.send(-1, fn, null);
}
@Override
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
- ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
+ CCNCFunctions.RegisterPartitionProviderFunction fn = new CCNCFunctions.RegisterPartitionProviderFunction(
partitionDescriptor);
ipcHandle.send(-1, fn, null);
}
@Override
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
- ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
+ CCNCFunctions.RegisterPartitionRequestFunction fn = new CCNCFunctions.RegisterPartitionRequestFunction(
partitionRequest);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
- ClusterControllerFunctions.ApplicationStateChangeResponseFunction fn = new ClusterControllerFunctions.ApplicationStateChangeResponseFunction(
+ CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
nodeId, appName, status);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
deleted file mode 100644
index b72d4e5..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-
-public class NodeControllerFunctions {
- public enum FunctionId {
- NODE_REGISTRATION_RESULT,
- START_TASKS,
- ABORT_TASKS,
- CLEANUP_JOBLET,
- CREATE_APPLICATION,
- DESTROY_APPLICATION,
- REPORT_PARTITION_AVAILABILITY
- }
-
- public static abstract class Function implements Serializable {
- private static final long serialVersionUID = 1L;
-
- public abstract FunctionId getFunctionId();
- }
-
- public static class NodeRegistrationResult extends Function {
- private static final long serialVersionUID = 1L;
-
- private final NodeParameters params;
-
- private final Exception exception;
-
- public NodeRegistrationResult(NodeParameters params, Exception exception) {
- this.params = params;
- this.exception = exception;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NODE_REGISTRATION_RESULT;
- }
-
- public NodeParameters getNodeParameters() {
- return params;
- }
-
- public Exception getException() {
- return exception;
- }
- }
-
- public static class StartTasksFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String appName;
- private final JobId jobId;
- private final byte[] planBytes;
- private final List<TaskAttemptDescriptor> taskDescriptors;
- private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
-
- public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
- this.appName = appName;
- this.jobId = jobId;
- this.planBytes = planBytes;
- this.taskDescriptors = taskDescriptors;
- this.connectorPolicies = connectorPolicies;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.START_TASKS;
- }
-
- public String getAppName() {
- return appName;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public byte[] getPlanBytes() {
- return planBytes;
- }
-
- public List<TaskAttemptDescriptor> getTaskDescriptors() {
- return taskDescriptors;
- }
-
- public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
- return connectorPolicies;
- }
- }
-
- public static class AbortTasksFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final List<TaskAttemptId> tasks;
-
- public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
- this.jobId = jobId;
- this.tasks = tasks;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.ABORT_TASKS;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public List<TaskAttemptId> getTasks() {
- return tasks;
- }
- }
-
- public static class CleanupJobletFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final JobStatus status;
-
- public CleanupJobletFunction(JobId jobId, JobStatus status) {
- this.jobId = jobId;
- this.status = status;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.CLEANUP_JOBLET;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public JobStatus getStatus() {
- return status;
- }
- }
-
- public static class CreateApplicationFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String appName;
- private final boolean deployHar;
- private final byte[] serializedDistributedState;
-
- public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
- this.appName = appName;
- this.deployHar = deployHar;
- this.serializedDistributedState = serializedDistributedState;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.CREATE_APPLICATION;
- }
-
- public String getAppName() {
- return appName;
- }
-
- public boolean isDeployHar() {
- return deployHar;
- }
-
- public byte[] getSerializedDistributedState() {
- return serializedDistributedState;
- }
- }
-
- public static class DestroyApplicationFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String appName;
-
- public DestroyApplicationFunction(String appName) {
- this.appName = appName;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.DESTROY_APPLICATION;
- }
-
- public String getAppName() {
- return appName;
- }
- }
-
- public static class ReportPartitionAvailabilityFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final PartitionId pid;
- private final NetworkAddress networkAddress;
-
- public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
- this.pid = pid;
- this.networkAddress = networkAddress;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REPORT_PARTITION_AVAILABILITY;
- }
-
- public PartitionId getPartitionId() {
- return pid;
- }
-
- public NetworkAddress getNetworkAddress() {
- return networkAddress;
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 474fb93..961a9e3 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -38,20 +38,20 @@
@Override
public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
- NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
+ CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId,
planBytes, taskDescriptors, connectorPolicies);
ipcHandle.send(-1, stf, null);
}
@Override
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
- NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
+ CCNCFunctions.AbortTasksFunction atf = new CCNCFunctions.AbortTasksFunction(jobId, tasks);
ipcHandle.send(-1, atf, null);
}
@Override
public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
- NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
+ CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId,
status);
ipcHandle.send(-1, cjf, null);
}
@@ -59,21 +59,21 @@
@Override
public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
throws Exception {
- NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
+ CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(
appName, deployHar, serializedDistributedState);
ipcHandle.send(-1, caf, null);
}
@Override
public void destroyApplication(String appName) throws Exception {
- NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
+ CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(
appName);
ipcHandle.send(-1, daf, null);
}
@Override
public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
- NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new CCNCFunctions.ReportPartitionAvailabilityFunction(
pid, networkAddress);
ipcHandle.send(-1, rpaf, null);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index ba1d970..0f512a7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -50,8 +50,8 @@
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -124,7 +124,8 @@
id = ncConfig.nodeId;
executor = Executors.newCachedThreadPool();
NodeControllerIPCI ipci = new NodeControllerIPCI();
- ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci);
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
+ new CCNCFunctions.SerializerDeserializer());
this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
if (id == null) {
throw new Exception("id not set");
@@ -347,49 +348,49 @@
private final class NodeControllerIPCI implements IIPCI {
@Override
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
- NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) payload;
+ CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
switch (fn.getFunctionId()) {
case START_TASKS: {
- NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+ CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies()));
return;
}
case ABORT_TASKS: {
- NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+ CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
return;
}
case CLEANUP_JOBLET: {
- NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+ CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
return;
}
case CREATE_APPLICATION: {
- NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+ CCNCFunctions.CreateApplicationFunction caf = (CCNCFunctions.CreateApplicationFunction) fn;
queue.schedule(new CreateApplicationWork(NodeControllerService.this, caf.getAppName(), caf
.isDeployHar(), caf.getSerializedDistributedState()));
return;
}
case DESTROY_APPLICATION: {
- NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+ CCNCFunctions.DestroyApplicationFunction daf = (CCNCFunctions.DestroyApplicationFunction) fn;
queue.schedule(new DestroyApplicationWork(NodeControllerService.this, daf.getAppName()));
return;
}
case REPORT_PARTITION_AVAILABILITY: {
- NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
.getPartitionId(), rpaf.getNetworkAddress()));
return;
}
case NODE_REGISTRATION_RESULT: {
- NodeControllerFunctions.NodeRegistrationResult nrrf = (NodeControllerFunctions.NodeRegistrationResult) fn;
+ CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
return;
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index f94be22..1a0a820 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -60,7 +60,7 @@
public void start() throws IOException {
md.start();
InetSocketAddress sockAddr = md.getLocalAddress();
- networkAddress = new NetworkAddress(sockAddr.getAddress(), sockAddr.getPort());
+ networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
}
public NetworkAddress getNetworkAddress() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index ce3f4e0..9734567 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.nc.work;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.logging.Logger;
@@ -48,8 +49,8 @@
Joblet ji = jobletMap.get(pid.getJobId());
if (ji != null) {
PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
- ncs.getNetworkManager(), new InetSocketAddress(networkAddress.getIpAddress(),
- networkAddress.getPort()), pid, 5));
+ ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+ .getIpAddress()), networkAddress.getPort()), pid, 5));
ji.reportPartitionAvailability(channel);
}
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..62648ff
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.nio.ByteBuffer;
+
+public interface IPayloadSerializerDeserializer {
+ public Object deserializeObject(ByteBuffer buffer, int length) throws Exception;
+
+ public Exception deserializeException(ByteBuffer buffer, int length) throws Exception;
+
+ public byte[] serializeObject(Object object) throws Exception;
+
+ public byte[] serializeException(Exception object) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index 09e8c2f..053ac6b 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -276,7 +276,7 @@
}
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index a01b1d9..3d3bc7a 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -54,6 +54,10 @@
return remoteAddress;
}
+ IPCSystem getIPCSystem() {
+ return system;
+ }
+
void setRemoteAddress(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 5f60c6d..6c9c82c 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
public class IPCSystem {
@@ -27,15 +28,15 @@
private final IIPCI ipci;
+ private final IPayloadSerializerDeserializer serde;
+
private final AtomicLong midFactory;
- public IPCSystem(InetSocketAddress socketAddress) throws IOException {
- this(socketAddress, null);
- }
-
- public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci) throws IOException {
+ public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, IPayloadSerializerDeserializer serde)
+ throws IOException {
cMgr = new IPCConnectionManager(this, socketAddress);
this.ipci = ipci;
+ this.serde = serde;
midFactory = new AtomicLong();
}
@@ -57,6 +58,10 @@
}
}
+ IPayloadSerializerDeserializer getSerializerDeserializer() {
+ return serde;
+ }
+
long createMessageId() {
return midFactory.incrementAndGet();
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..fdf8e92
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
+public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer {
+ @Override
+ public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+ return deserialize(buffer, length);
+ }
+
+ @Override
+ public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+ return (Exception) deserialize(buffer, length);
+ }
+
+ @Override
+ public byte[] serializeObject(Object object) throws Exception {
+ return serialize(object);
+ }
+
+ @Override
+ public byte[] serializeException(Exception exception) throws Exception {
+ return serialize(exception);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(object);
+ oos.flush();
+ }
+
+ private Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
+ length));
+ Object object = ois.readObject();
+ ois.close();
+ return object;
+ }
+
+ private byte[] serialize(Object object) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serialize(baos, object);
+ baos.close();
+ return baos.toByteArray();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
index 6bff56f..6bb3156 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -14,13 +14,10 @@
*/
package edu.uci.ics.hyracks.ipc.impl;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
class Message {
private static final int MSG_SIZE_SIZE = 4;
@@ -92,29 +89,26 @@
return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
}
- void read(ByteBuffer buffer) throws IOException, ClassNotFoundException {
+ void read(ByteBuffer buffer) throws Exception {
assert hasMessage(buffer);
int msgSize = buffer.getInt();
messageId = buffer.getLong();
requestMessageId = buffer.getLong();
flag = buffer.get();
int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
+ int length = msgSize - HEADER_SIZE;
try {
- ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
- msgSize - HEADER_SIZE));
- payload = ois.readObject();
- ois.close();
+ IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+ payload = flag == ERROR ? serde.deserializeException(buffer, length) : serde.deserializeObject(buffer,
+ length);
} finally {
buffer.position(finalPosition);
}
}
- boolean write(ByteBuffer buffer) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(payload);
- oos.close();
- byte[] bytes = baos.toByteArray();
+ boolean write(ByteBuffer buffer) throws Exception {
+ IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+ byte[] bytes = flag == ERROR ? serde.serializeException((Exception) payload) : serde.serializeObject(payload);
if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
buffer.putInt(HEADER_SIZE + bytes.length);
buffer.putLong(messageId);
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index dac93dd..5b2f660 100644
--- a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.ipc.api.RPCInterface;
import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
public class IPCTest {
@Test
@@ -80,10 +81,12 @@
});
}
};
- return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci);
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
}
private IPCSystem createClientIPCSystem(RPCInterface rpci) throws IOException {
- return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci);
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
}
}
\ No newline at end of file