Merged hyracks_dev_next into this branch.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1064 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index b474ee1..8c9409c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.RPCInterface;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
/**
* Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
@@ -65,7 +66,7 @@
public HyracksConnection(String ccHost, int ccPort) throws Exception {
this.ccHost = ccHost;
RPCInterface rpci = new RPCInterface();
- ipc = new IPCSystem(new InetSocketAddress(0), rpci);
+ ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
ipc.start();
IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
index 1424a63..703f74b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -15,20 +15,16 @@
package edu.uci.ics.hyracks.api.client;
import java.io.Serializable;
-import java.net.InetAddress;
public class NodeControllerInfo implements Serializable {
private static final long serialVersionUID = 1L;
private final String nodeId;
- private final InetAddress ipAddress;
-
private final NodeStatus status;
- public NodeControllerInfo(String nodeId, InetAddress ipAddress, NodeStatus status) {
+ public NodeControllerInfo(String nodeId, NodeStatus status) {
this.nodeId = nodeId;
- this.ipAddress = ipAddress;
this.status = status;
}
@@ -36,10 +32,6 @@
return nodeId;
}
- public InetAddress getIpAddress() {
- return ipAddress;
- }
-
public NodeStatus getStatus() {
return status;
}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
index 868221d..d176c3c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -15,21 +15,20 @@
package edu.uci.ics.hyracks.api.comm;
import java.io.Serializable;
-import java.net.InetAddress;
public final class NetworkAddress implements Serializable {
private static final long serialVersionUID = 1L;
- private final InetAddress ipAddress;
+ private final byte[] ipAddress;
private final int port;
- public NetworkAddress(InetAddress ipAddress, int port) {
+ public NetworkAddress(byte[] ipAddress, int port) {
this.ipAddress = ipAddress;
this.port = port;
}
- public InetAddress getIpAddress() {
+ public byte[] getIpAddress() {
return ipAddress;
}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index 75cc245..41b4c23 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -19,9 +19,9 @@
public final class ActivityId implements Serializable {
private static final long serialVersionUID = 1L;
private final OperatorDescriptorId odId;
- private final long id;
+ private final int id;
- public ActivityId(OperatorDescriptorId odId, long id) {
+ public ActivityId(OperatorDescriptorId odId, int id) {
this.odId = odId;
this.id = id;
}
@@ -30,7 +30,7 @@
return odId;
}
- public long getLocalId() {
+ public int getLocalId() {
return id;
}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index ec8ab76..c816d26 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -44,7 +44,8 @@
public ConnectorDescriptorId getConnectorId();
/**
- * Factory method to create the send side writer that writes into this connector.
+ * Factory method to create the send side writer that writes into this
+ * connector.
*
* @param ctx
* Context
@@ -66,7 +67,8 @@
throws HyracksDataException;
/**
- * Factory metod to create the receive side reader that reads data from this connector.
+ * Factory metod to create the receive side reader that reads data from this
+ * connector.
*
* @param ctx
* Context
@@ -96,12 +98,23 @@
ICCApplicationContext appCtx);
/**
- * Indicate which consumer partitions may receive data from the given producer partition.
+ * Indicate which consumer partitions may receive data from the given
+ * producer partition.
*/
public void indicateTargetPartitions(int nProducerPartitions, int nConsumerPartitions, int producerIndex,
BitSet targetBitmap);
/**
+ * Gets the display name.
+ */
+ public String getDisplayName();
+
+ /**
+ * Sets the display name.
+ */
+ public void setDisplayName(String displayName);
+
+ /**
* Translate this connector descriptor to JSON.
*
* @return
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index 78847eb..c37a530 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -59,7 +59,8 @@
public RecordDescriptor[] getOutputRecordDescriptors();
/**
- * Contributes the activity graph that describes the behavior of this operator.
+ * Contributes the activity graph that describes the behavior of this
+ * operator.
*
* @param builder
* - graph builder
@@ -78,6 +79,16 @@
ICCApplicationContext appCtx);
/**
+ * Gets the display name.
+ */
+ public String getDisplayName();
+
+ /**
+ * Sets the display name.
+ */
+ public void setDisplayName(String displayName);
+
+ /**
* Translates this operator descriptor to JSON.
*/
public JSONObject toJSON() throws JSONException;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 2578b88..d3e472b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -60,8 +60,8 @@
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
import edu.uci.ics.hyracks.control.common.logs.LogFile;
import edu.uci.ics.hyracks.control.common.work.IPCResponder;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -69,6 +69,7 @@
import edu.uci.ics.hyracks.ipc.api.IIPCI;
import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
public class ClusterControllerService extends AbstractRemoteService {
private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
@@ -119,9 +120,11 @@
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
executor = Executors.newCachedThreadPool();
IIPCI ccIPCI = new ClusterControllerIPCI();
- clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI);
+ clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+ new CCNCFunctions.SerializerDeserializer());
IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
- clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI);
+ clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this);
activeRunMap = new HashMap<JobId, JobRun>();
runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -317,69 +320,69 @@
private class ClusterControllerIPCI implements IIPCI {
@Override
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
- ClusterControllerFunctions.Function fn = (Function) payload;
+ CCNCFunctions.Function fn = (Function) payload;
switch (fn.getFunctionId()) {
case REGISTER_NODE: {
- ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+ CCNCFunctions.RegisterNodeFunction rnf = (CCNCFunctions.RegisterNodeFunction) fn;
workQueue.schedule(new RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
return;
}
case UNREGISTER_NODE: {
- ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+ CCNCFunctions.UnregisterNodeFunction unf = (CCNCFunctions.UnregisterNodeFunction) fn;
workQueue.schedule(new UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
return;
}
case NODE_HEARTBEAT: {
- ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+ CCNCFunctions.NodeHeartbeatFunction nhf = (CCNCFunctions.NodeHeartbeatFunction) fn;
workQueue.schedule(new NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(), nhf
.getHeartbeatData()));
return;
}
case NOTIFY_JOBLET_CLEANUP: {
- ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+ CCNCFunctions.NotifyJobletCleanupFunction njcf = (CCNCFunctions.NotifyJobletCleanupFunction) fn;
workQueue.schedule(new JobletCleanupNotificationWork(ClusterControllerService.this,
njcf.getJobId(), njcf.getNodeId()));
return;
}
case REPORT_PROFILE: {
- ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+ CCNCFunctions.ReportProfileFunction rpf = (CCNCFunctions.ReportProfileFunction) fn;
workQueue.schedule(new ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
return;
}
case NOTIFY_TASK_COMPLETE: {
- ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+ CCNCFunctions.NotifyTaskCompleteFunction ntcf = (CCNCFunctions.NotifyTaskCompleteFunction) fn;
workQueue.schedule(new TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(), ntcf
.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics()));
return;
}
case NOTIFY_TASK_FAILURE: {
- ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+ CCNCFunctions.NotifyTaskFailureFunction ntff = (CCNCFunctions.NotifyTaskFailureFunction) fn;
workQueue.schedule(new TaskFailureWork(ClusterControllerService.this, ntff.getJobId(), ntff
.getTaskId(), ntff.getDetails(), ntff.getDetails()));
return;
}
case REGISTER_PARTITION_PROVIDER: {
- ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+ CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn;
workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, rppf
.getPartitionDescriptor()));
return;
}
case REGISTER_PARTITION_REQUEST: {
- ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+ CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn;
workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, rprf
.getPartitionRequest()));
return;
}
case APPLICATION_STATE_CHANGE_RESPONSE: {
- ClusterControllerFunctions.ApplicationStateChangeResponseFunction astrf = (ClusterControllerFunctions.ApplicationStateChangeResponseFunction) fn;
+ CCNCFunctions.ApplicationStateChangeResponseFunction astrf = (CCNCFunctions.ApplicationStateChangeResponseFunction) fn;
workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
return;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 929462b..e6e6c2f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -49,6 +49,8 @@
import edu.uci.ics.hyracks.control.cc.job.TaskClusterId;
public class ActivityClusterPlanner {
+ private static final boolean USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION = true;
+
private static final Logger LOGGER = Logger.getLogger(ActivityClusterPlanner.class.getName());
private final JobScheduler scheduler;
@@ -64,13 +66,28 @@
JobRun jobRun = scheduler.getJobRun();
Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
+ Map<ActivityId, ActivityPlan> activityPlanMap = buildActivityPlanMap(ac, jobRun, pcMap);
+
+ assignConnectorPolicy(ac, activityPlanMap);
+
+ TaskCluster[] taskClusters = computeTaskClusters(ac, jobRun, activityPlanMap);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Plan for " + ac);
+ LOGGER.info("Built " + taskClusters.length + " Task Clusters");
+ for (TaskCluster tc : taskClusters) {
+ LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+ }
+ }
+
+ ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
+ }
+
+ private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
+ Map<ActivityId, ActivityPartitionDetails> pcMap) {
Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
- Set<ActivityId> activities = ac.getActivities();
-
- Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
-
Set<ActivityId> depAnIds = new HashSet<ActivityId>();
- for (ActivityId anId : activities) {
+ for (ActivityId anId : ac.getActivities()) {
depAnIds.clear();
getDependencyActivityIds(depAnIds, anId);
ActivityPartitionDetails apd = pcMap.get(anId);
@@ -94,53 +111,21 @@
tasks[i].getDependencies().add(dTaskId);
dTask.getDependents().add(tid);
}
- Set<TaskId> cluster = new HashSet<TaskId>();
- cluster.add(tid);
- taskClusterMap.put(tid, cluster);
}
activityPlan.setTasks(tasks);
activityPlanMap.put(anId, activityPlan);
}
+ return activityPlanMap;
+ }
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, activityPlanMap);
- scheduler.getJobRun().getConnectorPolicyMap().putAll(connectorPolicies);
+ private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
+ Map<ActivityId, ActivityPlan> activityPlanMap) {
+ Set<ActivityId> activities = ac.getActivities();
+ Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
+ activityPlanMap, activities);
- Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
- JobActivityGraph jag = jobRun.getJobActivityGraph();
- BitSet targetBitmap = new BitSet();
- for (ActivityId ac1 : activities) {
- Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
- int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
- if (outputConns != null) {
- for (IConnectorDescriptor c : outputConns) {
- ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityId ac2 = jag.getConsumerActivity(cdId);
- Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
- int nConsumers = ac2TaskStates.length;
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
- .getTaskId());
- if (cInfoList == null) {
- cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
- taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
- }
- Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
- for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
- TaskId targetTID = ac2TaskStates[j].getTaskId();
- cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
- IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
- if (cPolicy.requiresProducerConsumerCoscheduling()) {
- cluster.add(targetTID);
- }
- }
- }
- }
- }
- }
-
- TaskCluster[] taskClusters = buildTaskClusters(ac, activityPlanMap, taskClusterMap);
+ TaskCluster[] taskClusters = USE_CONNECTOR_POLICY_IN_TASK_CLUSTER_CONSTRUCTION ? buildConnectorPolicyAwareTaskClusters(
+ ac, activityPlanMap, taskConnectivity) : buildConnectorPolicyUnawareTaskClusters(ac, activityPlanMap);
for (TaskCluster tc : taskClusters) {
Set<TaskCluster> tcDependencyTaskClusters = tc.getDependencyTaskClusters();
@@ -170,20 +155,88 @@
}
}
}
+ return taskClusters;
+ }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Plan for " + ac);
- LOGGER.info("Built " + taskClusters.length + " Task Clusters");
- for (TaskCluster tc : taskClusters) {
- LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+ private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
+ Map<ActivityId, ActivityPlan> activityPlanMap) {
+ List<Task> taskStates = new ArrayList<Task>();
+ for (ActivityId anId : ac.getActivities()) {
+ ActivityPlan ap = activityPlanMap.get(anId);
+ Task[] tasks = ap.getTasks();
+ for (Task t : tasks) {
+ taskStates.add(t);
+ }
+ }
+ TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), 0), ac,
+ taskStates.toArray(new Task[taskStates.size()]));
+ for (Task t : tc.getTasks()) {
+ t.setTaskCluster(tc);
+ }
+ return new TaskCluster[] { tc };
+ }
+
+ private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
+ Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
+ Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
+ JobActivityGraph jag = jobRun.getJobActivityGraph();
+ BitSet targetBitmap = new BitSet();
+ for (ActivityId ac1 : activities) {
+ Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
+ int nProducers = ac1TaskStates.length;
+ List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+ if (outputConns != null) {
+ for (IConnectorDescriptor c : outputConns) {
+ ConnectorDescriptorId cdId = c.getConnectorId();
+ ActivityId ac2 = jag.getConsumerActivity(cdId);
+ Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
+ int nConsumers = ac2TaskStates.length;
+ for (int i = 0; i < nProducers; ++i) {
+ c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+ .getTaskId());
+ if (cInfoList == null) {
+ cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+ taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+ }
+ for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+ TaskId targetTID = ac2TaskStates[j].getTaskId();
+ cInfoList.add(Pair.<TaskId, ConnectorDescriptorId> of(targetTID, cdId));
+ }
+ }
+ }
+ }
+ }
+ return taskConnectivity;
+ }
+
+ private TaskCluster[] buildConnectorPolicyAwareTaskClusters(ActivityCluster ac,
+ Map<ActivityId, ActivityPlan> activityPlanMap,
+ Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
+ Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
+ for (ActivityId anId : ac.getActivities()) {
+ ActivityPlan ap = activityPlanMap.get(anId);
+ Task[] tasks = ap.getTasks();
+ for (Task t : tasks) {
+ Set<TaskId> cluster = new HashSet<TaskId>();
+ TaskId tid = t.getTaskId();
+ cluster.add(tid);
+ taskClusterMap.put(tid, cluster);
}
}
- ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
- }
+ JobRun jobRun = ac.getJobRun();
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = jobRun.getConnectorPolicyMap();
+ for (Map.Entry<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> e : taskConnectivity.entrySet()) {
+ Set<TaskId> cluster = taskClusterMap.get(e.getKey());
+ for (Pair<TaskId, ConnectorDescriptorId> p : e.getValue()) {
+ IConnectorPolicy cPolicy = connectorPolicies.get(p.getRight());
+ if (cPolicy.requiresProducerConsumerCoscheduling()) {
+ cluster.add(p.getLeft());
+ }
+ }
+ }
- private TaskCluster[] buildTaskClusters(ActivityCluster ac, Map<ActivityId, ActivityPlan> activityPlanMap,
- Map<TaskId, Set<TaskId>> taskClusterMap) {
/*
* taskClusterMap contains for every TID x, x -> { coscheduled consumer TIDs U x }
* We compute the transitive closure of this relation to find the largest set of
@@ -273,8 +326,7 @@
}
}
- private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
- Map<ActivityId, ActivityPlan> taskMap) {
+ private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
Set<ActivityId> activities = ac.getActivities();
@@ -300,7 +352,7 @@
}
}
}
- return cPolicyMap;
+ ac.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
}
private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
index 2d8ae85..f6271fe 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationStateChangeWork.java
@@ -6,7 +6,7 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.common.work.IResultCallback;
@@ -14,10 +14,10 @@
private static final Logger LOGGER = Logger.getLogger(ApplicationStateChangeWork.class.getName());
private final ClusterControllerService ccs;
- private final ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf;
+ private final CCNCFunctions.ApplicationStateChangeResponseFunction ascrf;
public ApplicationStateChangeWork(ClusterControllerService ccs,
- ClusterControllerFunctions.ApplicationStateChangeResponseFunction ascrf) {
+ CCNCFunctions.ApplicationStateChangeResponseFunction ascrf) {
this.ccs = ccs;
this.ascrf = ascrf;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 49a3be9..9e8d130 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,8 +39,7 @@
Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
- result.put(e.getKey(), new NodeControllerInfo(e.getKey(), e.getValue().getDataPort().getIpAddress(),
- NodeStatus.ALIVE));
+ result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE));
}
callback.setValue(result);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
index cba4b4b..e4e0aae 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterNodeWork.java
@@ -25,7 +25,7 @@
import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
import edu.uci.ics.hyracks.control.common.ipc.NodeControllerRemoteProxy;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -46,7 +46,7 @@
String id = reg.getNodeId();
IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
- NodeControllerFunctions.NodeRegistrationResult result = null;
+ CCNCFunctions.NodeRegistrationResult result = null;
try {
INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
@@ -69,9 +69,9 @@
params.setClusterControllerInfo(ccs.getClusterControllerInfo());
params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
- result = new NodeControllerFunctions.NodeRegistrationResult(params, null);
+ result = new CCNCFunctions.NodeRegistrationResult(params, null);
} catch (Exception e) {
- result = new NodeControllerFunctions.NodeRegistrationResult(null, e);
+ result = new CCNCFunctions.NodeRegistrationResult(null, e);
}
ncIPCHandle.send(-1, result, null);
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
new file mode 100644
index 0000000..9ea4636
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -0,0 +1,792 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+public class CCNCFunctions {
+ private static final int FID_CODE_SIZE = 1;
+
+ public enum FunctionId {
+ REGISTER_NODE,
+ UNREGISTER_NODE,
+ NOTIFY_JOBLET_CLEANUP,
+ NOTIFY_TASK_COMPLETE,
+ NOTIFY_TASK_FAILURE,
+ NODE_HEARTBEAT,
+ REPORT_PROFILE,
+ REGISTER_PARTITION_PROVIDER,
+ REGISTER_PARTITION_REQUEST,
+ APPLICATION_STATE_CHANGE_RESPONSE,
+
+ NODE_REGISTRATION_RESULT,
+ START_TASKS,
+ ABORT_TASKS,
+ CLEANUP_JOBLET,
+ CREATE_APPLICATION,
+ DESTROY_APPLICATION,
+ REPORT_PARTITION_AVAILABILITY,
+
+ OTHER
+ }
+
+ public static abstract class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class RegisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeRegistration reg;
+
+ public RegisterNodeFunction(NodeRegistration reg) {
+ this.reg = reg;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_NODE;
+ }
+
+ public NodeRegistration getNodeRegistration() {
+ return reg;
+ }
+ }
+
+ public static class UnregisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ public UnregisterNodeFunction(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.UNREGISTER_NODE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NotifyTaskCompleteFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final TaskProfile statistics;
+
+ public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_COMPLETE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public TaskProfile getStatistics() {
+ return statistics;
+ }
+ }
+
+ public static class NotifyTaskFailureFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final String details;
+
+ public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.details = details;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_FAILURE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getDetails() {
+ return details;
+ }
+ }
+
+ public static class NotifyJobletCleanupFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final String nodeId;
+
+ public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_JOBLET_CLEANUP;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NodeHeartbeatFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final HeartbeatData hbData;
+
+ public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+ this.nodeId = nodeId;
+ this.hbData = hbData;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_HEARTBEAT;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public HeartbeatData getHeartbeatData() {
+ return hbData;
+ }
+ }
+
+ public static class ReportProfileFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final List<JobProfile> profiles;
+
+ public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+ this.nodeId = nodeId;
+ this.profiles = profiles;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PROFILE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public List<JobProfile> getProfiles() {
+ return profiles;
+ }
+ }
+
+ public static class RegisterPartitionProviderFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionDescriptor partitionDescriptor;
+
+ public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+ this.partitionDescriptor = partitionDescriptor;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_PROVIDER;
+ }
+
+ public PartitionDescriptor getPartitionDescriptor() {
+ return partitionDescriptor;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ // Read PartitionId
+ PartitionId pid = readPartitionId(dis);
+
+ // Read nodeId
+ String nodeId = dis.readUTF();
+
+ // Read TaskAttemptId
+ TaskAttemptId taId = readTaskAttemptId(dis);
+
+ // Read reusable flag
+ boolean reusable = dis.readBoolean();
+
+ // Read Partition State
+ PartitionState state = readPartitionState(dis);
+
+ PartitionDescriptor pd = new PartitionDescriptor(pid, nodeId, taId, reusable);
+ pd.setState(state);
+ return new RegisterPartitionProviderFunction(pd);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ RegisterPartitionProviderFunction fn = (RegisterPartitionProviderFunction) object;
+
+ DataOutputStream dos = new DataOutputStream(out);
+
+ PartitionDescriptor pd = fn.getPartitionDescriptor();
+
+ // Write PartitionId
+ writePartitionId(dos, pd.getPartitionId());
+
+ // Write nodeId
+ dos.writeUTF(pd.getNodeId());
+
+ // Write TaskAttemptId
+ writeTaskAttemptId(dos, pd.getProducingTaskAttemptId());
+
+ // Write reusable flag
+ dos.writeBoolean(pd.isReusable());
+
+ // Write Partition State
+ writePartitionState(dos, pd.getState());
+ }
+ }
+
+ public static class RegisterPartitionRequestFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionRequest partitionRequest;
+
+ public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+ this.partitionRequest = partitionRequest;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_REQUEST;
+ }
+
+ public PartitionRequest getPartitionRequest() {
+ return partitionRequest;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ // Read PartitionId
+ PartitionId pid = readPartitionId(dis);
+
+ // Read nodeId
+ String nodeId = dis.readUTF();
+
+ // Read TaskAttemptId
+ TaskAttemptId taId = readTaskAttemptId(dis);
+
+ // Read Partition State
+ PartitionState state = readPartitionState(dis);
+
+ PartitionRequest pr = new PartitionRequest(pid, nodeId, taId, state);
+ return new RegisterPartitionRequestFunction(pr);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ RegisterPartitionRequestFunction fn = (RegisterPartitionRequestFunction) object;
+
+ DataOutputStream dos = new DataOutputStream(out);
+
+ PartitionRequest pr = fn.getPartitionRequest();
+
+ // Write PartitionId
+ writePartitionId(dos, pr.getPartitionId());
+
+ // Write nodeId
+ dos.writeUTF(pr.getNodeId());
+
+ // Write TaskAttemptId
+ writeTaskAttemptId(dos, pr.getRequestingTaskAttemptId());
+
+ // Write Partition State
+ writePartitionState(dos, pr.getMinimumState());
+ }
+ }
+
+ public static class ApplicationStateChangeResponseFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final String appName;
+ private final ApplicationStatus status;
+
+ public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
+ this.nodeId = nodeId;
+ this.appName = appName;
+ this.status = status;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getApplicationName() {
+ return appName;
+ }
+
+ public ApplicationStatus getStatus() {
+ return status;
+ }
+ }
+
+ public static class NodeRegistrationResult extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeParameters params;
+
+ private final Exception exception;
+
+ public NodeRegistrationResult(NodeParameters params, Exception exception) {
+ this.params = params;
+ this.exception = exception;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_REGISTRATION_RESULT;
+ }
+
+ public NodeParameters getNodeParameters() {
+ return params;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+ }
+
+ public static class StartTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final JobId jobId;
+ private final byte[] planBytes;
+ private final List<TaskAttemptDescriptor> taskDescriptors;
+ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
+ public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+ this.appName = appName;
+ this.jobId = jobId;
+ this.planBytes = planBytes;
+ this.taskDescriptors = taskDescriptors;
+ this.connectorPolicies = connectorPolicies;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_TASKS;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public byte[] getPlanBytes() {
+ return planBytes;
+ }
+
+ public List<TaskAttemptDescriptor> getTaskDescriptors() {
+ return taskDescriptors;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+ return connectorPolicies;
+ }
+ }
+
+ public static class AbortTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final List<TaskAttemptId> tasks;
+
+ public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+ this.jobId = jobId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.ABORT_TASKS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public List<TaskAttemptId> getTasks() {
+ return tasks;
+ }
+ }
+
+ public static class CleanupJobletFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final JobStatus status;
+
+ public CleanupJobletFunction(JobId jobId, JobStatus status) {
+ this.jobId = jobId;
+ this.status = status;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CLEANUP_JOBLET;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+ }
+
+ public static class CreateApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final boolean deployHar;
+ private final byte[] serializedDistributedState;
+
+ public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+ this.appName = appName;
+ this.deployHar = deployHar;
+ this.serializedDistributedState = serializedDistributedState;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public boolean isDeployHar() {
+ return deployHar;
+ }
+
+ public byte[] getSerializedDistributedState() {
+ return serializedDistributedState;
+ }
+ }
+
+ public static class DestroyApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public DestroyApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class ReportPartitionAvailabilityFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+ private final NetworkAddress networkAddress;
+
+ public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+ this.pid = pid;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PARTITION_AVAILABILITY;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ // Read PartitionId
+ PartitionId pid = readPartitionId(dis);
+
+ // Read NetworkAddress
+ NetworkAddress networkAddress = readNetworkAddress(dis);
+
+ return new ReportPartitionAvailabilityFunction(pid, networkAddress);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ ReportPartitionAvailabilityFunction fn = (ReportPartitionAvailabilityFunction) object;
+
+ DataOutputStream dos = new DataOutputStream(out);
+
+ // Write PartitionId
+ writePartitionId(dos, fn.getPartitionId());
+
+ // Write NetworkAddress
+ writeNetworkAddress(dos, fn.getNetworkAddress());
+ }
+ }
+
+ public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
+ private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
+
+ public SerializerDeserializer() {
+ javaSerde = new JavaSerializationBasedPayloadSerializerDeserializer();
+ }
+
+ @Override
+ public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+ if (length < FID_CODE_SIZE) {
+ throw new IllegalStateException("Message size too small: " + length);
+ }
+ byte fid = buffer.get();
+ return deserialize(fid, buffer, length - FID_CODE_SIZE);
+ }
+
+ @Override
+ public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+ if (length < FID_CODE_SIZE) {
+ throw new IllegalStateException("Message size too small: " + length);
+ }
+ byte fid = buffer.get();
+ if (fid != FunctionId.OTHER.ordinal()) {
+ throw new IllegalStateException("Expected FID for OTHER, found: " + fid);
+ }
+ return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE);
+ }
+
+ @Override
+ public byte[] serializeObject(Object object) throws Exception {
+ if (object instanceof Function) {
+ Function fn = (Function) object;
+ return serialize(object, (byte) fn.getFunctionId().ordinal());
+ } else {
+ return serialize(object, (byte) FunctionId.OTHER.ordinal());
+ }
+ }
+
+ @Override
+ public byte[] serializeException(Exception object) throws Exception {
+ return serialize(object, (byte) FunctionId.OTHER.ordinal());
+ }
+
+ private byte[] serialize(Object object, byte fid) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(fid);
+ serialize(baos, object, fid);
+ JavaSerializationBasedPayloadSerializerDeserializer.serialize(baos, object);
+ baos.close();
+ return baos.toByteArray();
+ }
+
+ private void serialize(OutputStream out, Object object, byte fid) throws Exception {
+ switch (FunctionId.values()[fid]) {
+ case REGISTER_PARTITION_PROVIDER:
+ RegisterPartitionProviderFunction.serialize(out, object);
+ return;
+
+ case REGISTER_PARTITION_REQUEST:
+ RegisterPartitionRequestFunction.serialize(out, object);
+ return;
+
+ case REPORT_PARTITION_AVAILABILITY:
+ ReportPartitionAvailabilityFunction.serialize(out, object);
+ return;
+ }
+ JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
+ }
+
+ private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception {
+ switch (FunctionId.values()[fid]) {
+ case REGISTER_PARTITION_PROVIDER:
+ return RegisterPartitionProviderFunction.deserialize(buffer, length);
+
+ case REGISTER_PARTITION_REQUEST:
+ return RegisterPartitionRequestFunction.deserialize(buffer, length);
+
+ case REPORT_PARTITION_AVAILABILITY:
+ return ReportPartitionAvailabilityFunction.deserialize(buffer, length);
+ }
+
+ return javaSerde.deserializeObject(buffer, length);
+ }
+ }
+
+ private static PartitionId readPartitionId(DataInputStream dis) throws IOException {
+ long jobId = dis.readLong();
+ int cdid = dis.readInt();
+ int senderIndex = dis.readInt();
+ int receiverIndex = dis.readInt();
+ PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+ return pid;
+ }
+
+ private static void writePartitionId(DataOutputStream dos, PartitionId pid) throws IOException {
+ dos.writeLong(pid.getJobId().getId());
+ dos.writeInt(pid.getConnectorDescriptorId().getId());
+ dos.writeInt(pid.getSenderIndex());
+ dos.writeInt(pid.getReceiverIndex());
+ }
+
+ private static TaskAttemptId readTaskAttemptId(DataInputStream dis) throws IOException {
+ int odid = dis.readInt();
+ int aid = dis.readInt();
+ int partition = dis.readInt();
+ int attempt = dis.readInt();
+ TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid),
+ partition), attempt);
+ return taId;
+ }
+
+ private static void writeTaskAttemptId(DataOutputStream dos, TaskAttemptId taId) throws IOException {
+ TaskId tid = taId.getTaskId();
+ ActivityId aid = tid.getActivityId();
+ OperatorDescriptorId odId = aid.getOperatorDescriptorId();
+ dos.writeInt(odId.getId());
+ dos.writeInt(aid.getLocalId());
+ dos.writeInt(tid.getPartition());
+ dos.writeInt(taId.getAttempt());
+ }
+
+ private static PartitionState readPartitionState(DataInputStream dis) throws IOException {
+ PartitionState state = PartitionState.values()[dis.readInt()];
+ return state;
+ }
+
+ private static void writePartitionState(DataOutputStream dos, PartitionState state) throws IOException {
+ dos.writeInt(state.ordinal());
+ }
+
+ private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException {
+ int bLen = dis.readInt();
+ byte[] ipAddress = new byte[bLen];
+ dis.read(ipAddress);
+ int port = dis.readInt();
+ NetworkAddress networkAddress = new NetworkAddress(ipAddress, port);
+ return networkAddress;
+ }
+
+ private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException {
+ byte[] ipAddress = networkAddress.getIpAddress();
+ dos.writeInt(ipAddress.length);
+ dos.write(ipAddress);
+ dos.writeInt(networkAddress.getPort());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
deleted file mode 100644
index 2b6792e..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import java.io.Serializable;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
-import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
-import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
-import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
-
-public class ClusterControllerFunctions {
- public enum FunctionId {
- REGISTER_NODE,
- UNREGISTER_NODE,
- NOTIFY_TASK_COMPLETE,
- NOTIFY_TASK_FAILURE,
- NOTIFY_JOBLET_CLEANUP,
- NODE_HEARTBEAT,
- REPORT_PROFILE,
- REGISTER_PARTITION_PROVIDER,
- REGISTER_PARTITION_REQUEST,
- APPLICATION_STATE_CHANGE_RESPONSE,
- }
-
- public static abstract class Function implements Serializable {
- private static final long serialVersionUID = 1L;
-
- public abstract FunctionId getFunctionId();
- }
-
- public static class RegisterNodeFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final NodeRegistration reg;
-
- public RegisterNodeFunction(NodeRegistration reg) {
- this.reg = reg;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REGISTER_NODE;
- }
-
- public NodeRegistration getNodeRegistration() {
- return reg;
- }
- }
-
- public static class UnregisterNodeFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String nodeId;
-
- public UnregisterNodeFunction(String nodeId) {
- this.nodeId = nodeId;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.UNREGISTER_NODE;
- }
-
- public String getNodeId() {
- return nodeId;
- }
- }
-
- public static class NotifyTaskCompleteFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final TaskAttemptId taskId;
- private final String nodeId;
- private final TaskProfile statistics;
-
- public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
- this.jobId = jobId;
- this.taskId = taskId;
- this.nodeId = nodeId;
- this.statistics = statistics;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NOTIFY_TASK_COMPLETE;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public TaskAttemptId getTaskId() {
- return taskId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public TaskProfile getStatistics() {
- return statistics;
- }
- }
-
- public static class NotifyTaskFailureFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final TaskAttemptId taskId;
- private final String nodeId;
- private final String details;
-
- public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
- this.jobId = jobId;
- this.taskId = taskId;
- this.nodeId = nodeId;
- this.details = details;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NOTIFY_TASK_FAILURE;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public TaskAttemptId getTaskId() {
- return taskId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public String getDetails() {
- return details;
- }
- }
-
- public static class NotifyJobletCleanupFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final String nodeId;
-
- public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
- this.jobId = jobId;
- this.nodeId = nodeId;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NOTIFY_JOBLET_CLEANUP;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public String getNodeId() {
- return nodeId;
- }
- }
-
- public static class NodeHeartbeatFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String nodeId;
- private final HeartbeatData hbData;
-
- public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
- this.nodeId = nodeId;
- this.hbData = hbData;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NODE_HEARTBEAT;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public HeartbeatData getHeartbeatData() {
- return hbData;
- }
- }
-
- public static class ReportProfileFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String nodeId;
- private final List<JobProfile> profiles;
-
- public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
- this.nodeId = nodeId;
- this.profiles = profiles;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REPORT_PROFILE;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public List<JobProfile> getProfiles() {
- return profiles;
- }
- }
-
- public static class RegisterPartitionProviderFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final PartitionDescriptor partitionDescriptor;
-
- public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
- this.partitionDescriptor = partitionDescriptor;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REGISTER_PARTITION_PROVIDER;
- }
-
- public PartitionDescriptor getPartitionDescriptor() {
- return partitionDescriptor;
- }
- }
-
- public static class RegisterPartitionRequestFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final PartitionRequest partitionRequest;
-
- public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
- this.partitionRequest = partitionRequest;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REGISTER_PARTITION_REQUEST;
- }
-
- public PartitionRequest getPartitionRequest() {
- return partitionRequest;
- }
- }
-
- public static class ApplicationStateChangeResponseFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String nodeId;
- private final String appName;
- private final ApplicationStatus status;
-
- public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
- this.nodeId = nodeId;
- this.appName = appName;
- this.status = status;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
- }
-
- public String getNodeId() {
- return nodeId;
- }
-
- public String getApplicationName() {
- return appName;
- }
-
- public ApplicationStatus getStatus() {
- return status;
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index b7dd0af..a0dabdd 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -37,13 +37,13 @@
@Override
public void registerNode(NodeRegistration reg) throws Exception {
- ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
+ CCNCFunctions.RegisterNodeFunction fn = new CCNCFunctions.RegisterNodeFunction(reg);
ipcHandle.send(-1, fn, null);
}
@Override
public void unregisterNode(String nodeId) throws Exception {
- ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
+ CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(
nodeId);
ipcHandle.send(-1, fn, null);
}
@@ -51,56 +51,56 @@
@Override
public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
throws Exception {
- ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
+ CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(
jobId, taskId, nodeId, statistics);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
- ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
+ CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(
jobId, taskId, nodeId, details);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
- ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
+ CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(
jobId, nodeId);
ipcHandle.send(-1, fn, null);
}
@Override
public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
- ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
+ CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id,
hbData);
ipcHandle.send(-1, fn, null);
}
@Override
public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
- ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
+ CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id,
profiles);
ipcHandle.send(-1, fn, null);
}
@Override
public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
- ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
+ CCNCFunctions.RegisterPartitionProviderFunction fn = new CCNCFunctions.RegisterPartitionProviderFunction(
partitionDescriptor);
ipcHandle.send(-1, fn, null);
}
@Override
public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
- ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
+ CCNCFunctions.RegisterPartitionRequestFunction fn = new CCNCFunctions.RegisterPartitionRequestFunction(
partitionRequest);
ipcHandle.send(-1, fn, null);
}
@Override
public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
- ClusterControllerFunctions.ApplicationStateChangeResponseFunction fn = new ClusterControllerFunctions.ApplicationStateChangeResponseFunction(
+ CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
nodeId, appName, status);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
deleted file mode 100644
index b72d4e5..0000000
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.common.ipc;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-
-public class NodeControllerFunctions {
- public enum FunctionId {
- NODE_REGISTRATION_RESULT,
- START_TASKS,
- ABORT_TASKS,
- CLEANUP_JOBLET,
- CREATE_APPLICATION,
- DESTROY_APPLICATION,
- REPORT_PARTITION_AVAILABILITY
- }
-
- public static abstract class Function implements Serializable {
- private static final long serialVersionUID = 1L;
-
- public abstract FunctionId getFunctionId();
- }
-
- public static class NodeRegistrationResult extends Function {
- private static final long serialVersionUID = 1L;
-
- private final NodeParameters params;
-
- private final Exception exception;
-
- public NodeRegistrationResult(NodeParameters params, Exception exception) {
- this.params = params;
- this.exception = exception;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.NODE_REGISTRATION_RESULT;
- }
-
- public NodeParameters getNodeParameters() {
- return params;
- }
-
- public Exception getException() {
- return exception;
- }
- }
-
- public static class StartTasksFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String appName;
- private final JobId jobId;
- private final byte[] planBytes;
- private final List<TaskAttemptDescriptor> taskDescriptors;
- private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
-
- public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
- this.appName = appName;
- this.jobId = jobId;
- this.planBytes = planBytes;
- this.taskDescriptors = taskDescriptors;
- this.connectorPolicies = connectorPolicies;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.START_TASKS;
- }
-
- public String getAppName() {
- return appName;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public byte[] getPlanBytes() {
- return planBytes;
- }
-
- public List<TaskAttemptDescriptor> getTaskDescriptors() {
- return taskDescriptors;
- }
-
- public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
- return connectorPolicies;
- }
- }
-
- public static class AbortTasksFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final List<TaskAttemptId> tasks;
-
- public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
- this.jobId = jobId;
- this.tasks = tasks;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.ABORT_TASKS;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public List<TaskAttemptId> getTasks() {
- return tasks;
- }
- }
-
- public static class CleanupJobletFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final JobId jobId;
- private final JobStatus status;
-
- public CleanupJobletFunction(JobId jobId, JobStatus status) {
- this.jobId = jobId;
- this.status = status;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.CLEANUP_JOBLET;
- }
-
- public JobId getJobId() {
- return jobId;
- }
-
- public JobStatus getStatus() {
- return status;
- }
- }
-
- public static class CreateApplicationFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String appName;
- private final boolean deployHar;
- private final byte[] serializedDistributedState;
-
- public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
- this.appName = appName;
- this.deployHar = deployHar;
- this.serializedDistributedState = serializedDistributedState;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.CREATE_APPLICATION;
- }
-
- public String getAppName() {
- return appName;
- }
-
- public boolean isDeployHar() {
- return deployHar;
- }
-
- public byte[] getSerializedDistributedState() {
- return serializedDistributedState;
- }
- }
-
- public static class DestroyApplicationFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final String appName;
-
- public DestroyApplicationFunction(String appName) {
- this.appName = appName;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.DESTROY_APPLICATION;
- }
-
- public String getAppName() {
- return appName;
- }
- }
-
- public static class ReportPartitionAvailabilityFunction extends Function {
- private static final long serialVersionUID = 1L;
-
- private final PartitionId pid;
- private final NetworkAddress networkAddress;
-
- public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
- this.pid = pid;
- this.networkAddress = networkAddress;
- }
-
- @Override
- public FunctionId getFunctionId() {
- return FunctionId.REPORT_PARTITION_AVAILABILITY;
- }
-
- public PartitionId getPartitionId() {
- return pid;
- }
-
- public NetworkAddress getNetworkAddress() {
- return networkAddress;
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 474fb93..961a9e3 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -38,20 +38,20 @@
@Override
public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
- NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
+ CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId,
planBytes, taskDescriptors, connectorPolicies);
ipcHandle.send(-1, stf, null);
}
@Override
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
- NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
+ CCNCFunctions.AbortTasksFunction atf = new CCNCFunctions.AbortTasksFunction(jobId, tasks);
ipcHandle.send(-1, atf, null);
}
@Override
public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
- NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
+ CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId,
status);
ipcHandle.send(-1, cjf, null);
}
@@ -59,21 +59,21 @@
@Override
public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
throws Exception {
- NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
+ CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(
appName, deployHar, serializedDistributedState);
ipcHandle.send(-1, caf, null);
}
@Override
public void destroyApplication(String appName) throws Exception {
- NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
+ CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(
appName);
ipcHandle.send(-1, daf, null);
}
@Override
public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
- NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new CCNCFunctions.ReportPartitionAvailabilityFunction(
pid, networkAddress);
ipcHandle.send(-1, rpaf, null);
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index ba1d970..0f512a7 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -50,8 +50,8 @@
import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
-import edu.uci.ics.hyracks.control.common.ipc.NodeControllerFunctions;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
@@ -124,7 +124,8 @@
id = ncConfig.nodeId;
executor = Executors.newCachedThreadPool();
NodeControllerIPCI ipci = new NodeControllerIPCI();
- ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci);
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
+ new CCNCFunctions.SerializerDeserializer());
this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
if (id == null) {
throw new Exception("id not set");
@@ -347,49 +348,49 @@
private final class NodeControllerIPCI implements IIPCI {
@Override
public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
- NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) payload;
+ CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
switch (fn.getFunctionId()) {
case START_TASKS: {
- NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+ CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies()));
return;
}
case ABORT_TASKS: {
- NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+ CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
return;
}
case CLEANUP_JOBLET: {
- NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+ CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
return;
}
case CREATE_APPLICATION: {
- NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+ CCNCFunctions.CreateApplicationFunction caf = (CCNCFunctions.CreateApplicationFunction) fn;
queue.schedule(new CreateApplicationWork(NodeControllerService.this, caf.getAppName(), caf
.isDeployHar(), caf.getSerializedDistributedState()));
return;
}
case DESTROY_APPLICATION: {
- NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+ CCNCFunctions.DestroyApplicationFunction daf = (CCNCFunctions.DestroyApplicationFunction) fn;
queue.schedule(new DestroyApplicationWork(NodeControllerService.this, daf.getAppName()));
return;
}
case REPORT_PARTITION_AVAILABILITY: {
- NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
.getPartitionId(), rpaf.getNetworkAddress()));
return;
}
case NODE_REGISTRATION_RESULT: {
- NodeControllerFunctions.NodeRegistrationResult nrrf = (NodeControllerFunctions.NodeRegistrationResult) fn;
+ CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
return;
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index f94be22..1a0a820 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -60,7 +60,7 @@
public void start() throws IOException {
md.start();
InetSocketAddress sockAddr = md.getLocalAddress();
- networkAddress = new NetworkAddress(sockAddr.getAddress(), sockAddr.getPort());
+ networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
}
public NetworkAddress getNetworkAddress() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index ce3f4e0..9734567 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.nc.work;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.logging.Logger;
@@ -48,8 +49,8 @@
Joblet ji = jobletMap.get(pid.getJobId());
if (ji != null) {
PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
- ncs.getNetworkManager(), new InetSocketAddress(networkAddress.getIpAddress(),
- networkAddress.getPort()), pid, 5));
+ ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+ .getIpAddress()), networkAddress.getPort()), pid, 5));
ji.reportPartitionAvailability(channel);
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 4ffc060..37154bc 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -28,21 +28,33 @@
private static final long serialVersionUID = 1L;
protected final ConnectorDescriptorId id;
+ protected String displayName;
+
public AbstractConnectorDescriptor(JobSpecification spec) {
this.id = spec.createConnectorDescriptor();
spec.getConnectorMap().put(id, this);
+ displayName = getClass().getName() + "[" + id + "]";
}
public ConnectorDescriptorId getConnectorId() {
return id;
}
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
@Override
public JSONObject toJSON() throws JSONException {
JSONObject jconn = new JSONObject();
jconn.put("id", getConnectorId().getId());
jconn.put("java-class", getClass().getName());
+ jconn.put("display-name", displayName);
return jconn;
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 1bb13ca..1a60290 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -38,12 +38,15 @@
protected final int outputArity;
+ protected String displayName;
+
public AbstractOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity) {
odId = spec.createOperatorDescriptorId();
this.inputArity = inputArity;
this.outputArity = outputArity;
recordDescriptors = new RecordDescriptor[outputArity];
spec.getOperatorMap().put(getOperatorId(), this);
+ displayName = getClass().getName() + "[" + odId + "]";
}
@Override
@@ -66,6 +69,14 @@
return recordDescriptors;
}
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public void setDisplayName(String displayName) {
+ this.displayName = displayName;
+ }
+
@Override
public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
ICCApplicationContext appCtx) {
@@ -79,6 +90,7 @@
jop.put("java-class", getClass().getName());
jop.put("in-arity", getInputArity());
jop.put("out-arity", getOutputArity());
+ jop.put("display-name", displayName);
return jop;
}
}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..62648ff
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/api/IPayloadSerializerDeserializer.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.ipc.api;
+
+import java.nio.ByteBuffer;
+
+public interface IPayloadSerializerDeserializer {
+ public Object deserializeObject(ByteBuffer buffer, int length) throws Exception;
+
+ public Exception deserializeException(ByteBuffer buffer, int length) throws Exception;
+
+ public byte[] serializeObject(Object object) throws Exception;
+
+ public byte[] serializeException(Exception object) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
index eb9cbd3..053ac6b 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCConnectionManager.java
@@ -208,7 +208,7 @@
SelectionKey key = handle.getKey();
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
- if (buffer.position() == 0) {
+ if (!buffer.hasRemaining()) {
handle.resizeOutBuffer();
continue;
}
@@ -251,7 +251,10 @@
} else if (!writeBuffer.hasRemaining()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
- handle.clearFull();
+ if (handle.full()) {
+ handle.clearFull();
+ selector.wakeup();
+ }
} else if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
@@ -273,7 +276,7 @@
}
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
index f3c05c6..3d3bc7a 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCHandle.java
@@ -54,6 +54,10 @@
return remoteAddress;
}
+ IPCSystem getIPCSystem() {
+ return system;
+ }
+
void setRemoteAddress(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@@ -161,14 +165,12 @@
inBuffer.flip();
ByteBuffer readBuffer = ByteBuffer.allocate(inBuffer.capacity() * 2);
readBuffer.put(inBuffer);
- readBuffer.compact();
inBuffer = readBuffer;
}
void resizeOutBuffer() {
ByteBuffer writeBuffer = ByteBuffer.allocate(outBuffer.capacity() * 2);
writeBuffer.put(outBuffer);
- writeBuffer.compact();
writeBuffer.flip();
outBuffer = writeBuffer;
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
index 5f60c6d..6c9c82c 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/IPCSystem.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
public class IPCSystem {
@@ -27,15 +28,15 @@
private final IIPCI ipci;
+ private final IPayloadSerializerDeserializer serde;
+
private final AtomicLong midFactory;
- public IPCSystem(InetSocketAddress socketAddress) throws IOException {
- this(socketAddress, null);
- }
-
- public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci) throws IOException {
+ public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, IPayloadSerializerDeserializer serde)
+ throws IOException {
cMgr = new IPCConnectionManager(this, socketAddress);
this.ipci = ipci;
+ this.serde = serde;
midFactory = new AtomicLong();
}
@@ -57,6 +58,10 @@
}
}
+ IPayloadSerializerDeserializer getSerializerDeserializer() {
+ return serde;
+ }
+
long createMessageId() {
return midFactory.incrementAndGet();
}
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
new file mode 100644
index 0000000..fdf8e92
--- /dev/null
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
@@ -0,0 +1,53 @@
+package edu.uci.ics.hyracks.ipc.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
+public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer {
+ @Override
+ public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+ return deserialize(buffer, length);
+ }
+
+ @Override
+ public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+ return (Exception) deserialize(buffer, length);
+ }
+
+ @Override
+ public byte[] serializeObject(Object object) throws Exception {
+ return serialize(object);
+ }
+
+ @Override
+ public byte[] serializeException(Exception exception) throws Exception {
+ return serialize(exception);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ oos.writeObject(object);
+ oos.flush();
+ }
+
+ private Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
+ length));
+ Object object = ois.readObject();
+ ois.close();
+ return object;
+ }
+
+ private byte[] serialize(Object object) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serialize(baos, object);
+ baos.close();
+ return baos.toByteArray();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
index 6bff56f..6bb3156 100644
--- a/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
+++ b/hyracks-ipc/src/main/java/edu/uci/ics/hyracks/ipc/impl/Message.java
@@ -14,13 +14,10 @@
*/
package edu.uci.ics.hyracks.ipc.impl;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+
class Message {
private static final int MSG_SIZE_SIZE = 4;
@@ -92,29 +89,26 @@
return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
}
- void read(ByteBuffer buffer) throws IOException, ClassNotFoundException {
+ void read(ByteBuffer buffer) throws Exception {
assert hasMessage(buffer);
int msgSize = buffer.getInt();
messageId = buffer.getLong();
requestMessageId = buffer.getLong();
flag = buffer.get();
int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
+ int length = msgSize - HEADER_SIZE;
try {
- ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(),
- msgSize - HEADER_SIZE));
- payload = ois.readObject();
- ois.close();
+ IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+ payload = flag == ERROR ? serde.deserializeException(buffer, length) : serde.deserializeObject(buffer,
+ length);
} finally {
buffer.position(finalPosition);
}
}
- boolean write(ByteBuffer buffer) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(payload);
- oos.close();
- byte[] bytes = baos.toByteArray();
+ boolean write(ByteBuffer buffer) throws Exception {
+ IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+ byte[] bytes = flag == ERROR ? serde.serializeException((Exception) payload) : serde.serializeObject(payload);
if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
buffer.putInt(HEADER_SIZE + bytes.length);
buffer.putLong(messageId);
diff --git a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
index dac93dd..5b2f660 100644
--- a/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-ipc/src/test/java/edu/uci/ics/hyracks/ipc/tests/IPCTest.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.ipc.api.RPCInterface;
import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
public class IPCTest {
@Test
@@ -80,10 +81,12 @@
});
}
};
- return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci);
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
}
private IPCSystem createClientIPCSystem(RPCInterface rpci) throws IOException {
- return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci);
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
}
}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 99140db..7f6853b 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -21,12 +21,16 @@
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.exceptions.NetException;
public class ChannelControlBlock {
+ private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());
+
private final ChannelSet cSet;
private final int channelId;
@@ -166,6 +170,9 @@
public void close() {
synchronized (ChannelControlBlock.this) {
if (eos) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Received duplicate close() on channel: " + channelId);
+ }
return;
}
eos = true;
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 238f2ee..1d71fac 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -178,6 +178,7 @@
if (!writerState.performPendingWrite(sc)) {
return;
}
+ pendingWriteEventsCounter.decrement();
}
int numCycles;
@@ -239,9 +240,11 @@
}
writeCCB.write(writerState);
if (writerState.writePending()) {
+ pendingWriteEventsCounter.increment();
if (!writerState.performPendingWrite(sc)) {
return;
}
+ pendingWriteEventsCounter.decrement();
}
}
}
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
index 37641cb..d73ba21 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -70,7 +70,7 @@
ioThreads[targetThread].initiateConnection(remoteAddress);
}
- private void addIncomingConnection(SocketChannel channel) {
+ private void distributeIncomingConnection(SocketChannel channel) {
int targetThread = getNextThread();
ioThreads[targetThread].addIncomingConnection(channel);
}
@@ -80,24 +80,23 @@
}
private class IOThread extends Thread {
- private final List<InetSocketAddress>[] pendingConnections;
+ private final List<InetSocketAddress> pendingConnections;
- private final List<SocketChannel>[] incomingConnections;
+ private final List<InetSocketAddress> workingPendingConnections;
- private int writerIndex;
+ private final List<SocketChannel> incomingConnections;
- private int readerIndex;
+ private final List<SocketChannel> workingIncomingConnections;
private Selector selector;
public IOThread() throws IOException {
super("TCPEndpoint IO Thread");
setPriority(MAX_PRIORITY);
- this.pendingConnections = new List[] { new ArrayList<InetSocketAddress>(),
- new ArrayList<InetSocketAddress>() };
- this.incomingConnections = new List[] { new ArrayList<SocketChannel>(), new ArrayList<SocketChannel>() };
- writerIndex = 0;
- readerIndex = 1;
+ this.pendingConnections = new ArrayList<InetSocketAddress>();
+ this.workingPendingConnections = new ArrayList<InetSocketAddress>();
+ this.incomingConnections = new ArrayList<SocketChannel>();
+ this.workingIncomingConnections = new ArrayList<SocketChannel>();
selector = Selector.open();
}
@@ -106,9 +105,9 @@
while (true) {
try {
int n = selector.select();
- swapReadersAndWriters();
- if (!pendingConnections[readerIndex].isEmpty()) {
- for (InetSocketAddress address : pendingConnections[readerIndex]) {
+ collectOutstandingWork();
+ if (!workingPendingConnections.isEmpty()) {
+ for (InetSocketAddress address : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
if (!channel.connect(address)) {
@@ -118,10 +117,10 @@
createConnection(key, channel);
}
}
- pendingConnections[readerIndex].clear();
+ workingPendingConnections.clear();
}
- if (!incomingConnections[readerIndex].isEmpty()) {
- for (SocketChannel channel : incomingConnections[readerIndex]) {
+ if (!workingIncomingConnections.isEmpty()) {
+ for (SocketChannel channel : workingIncomingConnections) {
channel.configureBlocking(false);
SelectionKey sKey = channel.register(selector, 0);
TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector);
@@ -130,7 +129,7 @@
connectionListener.acceptedConnection(connection);
}
}
- incomingConnections[readerIndex].clear();
+ workingIncomingConnections.clear();
}
if (n > 0) {
Iterator<SelectionKey> i = selector.selectedKeys().iterator();
@@ -148,7 +147,7 @@
if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
- addIncomingConnection(channel);
+ distributeIncomingConnection(channel);
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) sc;
if (channel.finishConnect()) {
@@ -171,12 +170,12 @@
}
synchronized void initiateConnection(InetSocketAddress remoteAddress) {
- pendingConnections[writerIndex].add(remoteAddress);
+ pendingConnections.add(remoteAddress);
selector.wakeup();
}
synchronized void addIncomingConnection(SocketChannel channel) {
- incomingConnections[writerIndex].add(channel);
+ incomingConnections.add(channel);
selector.wakeup();
}
@@ -185,10 +184,15 @@
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
- private synchronized void swapReadersAndWriters() {
- int temp = readerIndex;
- readerIndex = writerIndex;
- writerIndex = temp;
+ private synchronized void collectOutstandingWork() {
+ if (!pendingConnections.isEmpty()) {
+ workingPendingConnections.addAll(pendingConnections);
+ pendingConnections.clear();
+ }
+ if (!incomingConnections.isEmpty()) {
+ workingIncomingConnections.addAll(incomingConnections);
+ incomingConnections.clear();
+ }
}
}
}
\ No newline at end of file