Merged trunk -r 363:437 into branch
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@468 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/.settings/org.eclipse.jdt.core.prefs b/hyracks-control-cc/.settings/org.eclipse.jdt.core.prefs
index 1b67f41..375e12e 100644
--- a/hyracks-control-cc/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-control-cc/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Fri Jul 30 17:52:26 PDT 2010
+#Fri May 20 19:34:07 PDT 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-control-cc/pom.xml b/hyracks-control-cc/pom.xml
index c67e7a0..968aa3f 100644
--- a/hyracks-control-cc/pom.xml
+++ b/hyracks-control-cc/pom.xml
@@ -2,12 +2,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-cc</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
</parent>
<build>
@@ -27,7 +27,7 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-control-common</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index a4aa482..e5efe92 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -23,6 +23,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -33,9 +34,11 @@
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.control.CCConfig;
import edu.uci.ics.hyracks.api.control.IClusterController;
import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NCConfig;
import edu.uci.ics.hyracks.api.control.NodeParameters;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -76,6 +79,8 @@
private final Map<String, NodeControllerState> nodeRegistry;
+ private final Map<String, Set<String>> ipAddressNodeNameMap;
+
private final Map<String, CCApplicationContext> applications;
private final ServerContext serverCtx;
@@ -94,9 +99,12 @@
private final Timer timer;
+ private final ICCContext ccContext;
+
public ClusterControllerService(CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+ ipAddressNodeNameMap = new HashMap<String, Set<String>>();
applications = new Hashtable<String, CCApplicationContext>();
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
ClusterControllerService.class.getName()));
@@ -106,6 +114,12 @@
jobQueue = new JobQueue();
scheduler = new NaiveScheduler(this);
this.timer = new Timer(true);
+ ccContext = new ICCContext() {
+ @Override
+ public Map<String, Set<String>> getIPAddressNodeMap() {
+ return ipAddressNodeNameMap;
+ }
+ };
}
@Override
@@ -153,6 +167,10 @@
return nodeRegistry;
}
+ public Map<String, Set<String>> getIPAddressNodeNameMap() {
+ return ipAddressNodeNameMap;
+ }
+
public CCConfig getConfig() {
return ccConfig;
}
@@ -169,7 +187,8 @@
@Override
public NodeParameters registerNode(INodeController nodeController) throws Exception {
String id = nodeController.getId();
- NodeControllerState state = new NodeControllerState(nodeController);
+ NCConfig ncConfig = nodeController.getConfiguration();
+ NodeControllerState state = new NodeControllerState(nodeController, ncConfig);
jobQueue.scheduleAndSync(new RegisterNodeEvent(this, id, state));
nodeController.notifyRegistration(this);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
@@ -242,7 +261,7 @@
if (applications.containsKey(appName)) {
throw new HyracksException("Duplicate application with name: " + appName + " being created.");
}
- CCApplicationContext appCtx = new CCApplicationContext(serverCtx, appName);
+ CCApplicationContext appCtx = new CCApplicationContext(serverCtx, ccContext, appName);
applications.put(appName, appCtx);
}
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index e69281b..40eb9ae 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -5,16 +5,20 @@
import java.util.UUID;
import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NCConfig;
public class NodeControllerState {
private final INodeController nodeController;
+ private final NCConfig ncConfig;
+
private final Set<UUID> activeJobIds;
private int lastHeartbeatDuration;
- public NodeControllerState(INodeController nodeController) {
+ public NodeControllerState(INodeController nodeController, NCConfig ncConfig) {
this.nodeController = nodeController;
+ this.ncConfig = ncConfig;
activeJobIds = new HashSet<UUID>();
}
@@ -34,6 +38,10 @@
return nodeController;
}
+ public NCConfig getNCConfig() {
+ return ncConfig;
+ }
+
public Set<UUID> getActiveJobIds() {
return activeJobIds;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 5ca0269..96e29d3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -8,6 +8,7 @@
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
@@ -17,12 +18,15 @@
import edu.uci.ics.hyracks.control.common.context.ServerContext;
public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
+ private final ICCContext ccContext;
+
private IJobSpecificationFactory jobSpecFactory;
private List<IJobLifecycleListener> jobLifecycleListeners;
- public CCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+ public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext, String appName) throws IOException {
super(serverCtx, appName);
+ this.ccContext = ccContext;
jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
}
@@ -33,6 +37,10 @@
bootstrap.start();
}
+ public ICCContext getCCContext() {
+ return ccContext;
+ }
+
@Override
public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory) {
this.jobSpecFactory = jobSpecFactory;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index 3b03d25..d7f244e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -60,7 +60,7 @@
throw new HyracksException("No application with id " + appName + " found");
}
JobSpecification spec = appCtx.createJobSpecification(jobId, jobSpec);
- JobRun run = plan(jobId, spec, jobFlags);
+ JobRun run = plan(jobId, spec, appCtx, jobFlags);
run.setStatus(JobStatus.INITIALIZED);
ccs.getRunMap().put(jobId, run);
@@ -71,7 +71,8 @@
return jobId;
}
- private JobRun plan(UUID jobId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ private JobRun plan(UUID jobId, JobSpecification jobSpec, final CCApplicationContext appCtx,
+ EnumSet<JobFlag> jobFlags) throws Exception {
final JobPlanBuilder builder = new JobPlanBuilder();
builder.init(appName, jobId, jobSpec, jobFlags);
PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
@@ -92,13 +93,13 @@
PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
@Override
public void visit(IOperatorDescriptor op) {
- op.contributeSchedulingConstraints(acceptor, plan);
+ op.contributeSchedulingConstraints(acceptor, plan, appCtx);
}
});
PlanUtils.visit(jobSpec, new IConnectorDescriptorVisitor() {
@Override
public void visit(IConnectorDescriptor conn) {
- conn.contributeSchedulingConstraints(acceptor, plan);
+ conn.contributeSchedulingConstraints(acceptor, plan, appCtx);
}
});
contributedConstraints.addAll(jobSpec.getUserConstraints());
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
index 0754dc1..3df302f 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.hyracks.control.cc.job.manager.events;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -38,5 +40,13 @@
throw new Exception("Node with this name already registered.");
}
nodeMap.put(nodeId, state);
+ Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
+ String ipAddress = state.getNCConfig().dataIPAddress;
+ Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+ if (nodes == null) {
+ nodes = new HashSet<String>();
+ ipAddressNodeNameMap.put(ipAddress, nodes);
+ }
+ nodes.add(nodeId);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 38fb7ae..dca0dee 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -44,6 +44,7 @@
LOGGER.info(e.getKey() + " considered dead");
}
}
+ Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
for (String deadNode : deadNodes) {
NodeControllerState state = nodeMap.remove(deadNode);
for (final UUID jid : state.getActiveJobIds()) {
@@ -52,6 +53,13 @@
LOGGER.info("Aborting: " + jid);
ccs.getJobQueue().schedule(new JobAbortEvent(ccs, jid, lastAttempt));
}
+ String ipAddress = state.getNCConfig().dataIPAddress;
+ Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
+ if (ipNodes != null) {
+ if (ipNodes.remove(deadNode) && ipNodes.isEmpty()) {
+ ipAddressNodeNameMap.remove(ipAddress);
+ }
+ }
}
}
}
\ No newline at end of file