Added JOL code for starting stagelets
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@16 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
index c271356..e7152f9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
@@ -16,28 +16,29 @@
import java.rmi.Remote;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobStage;
import edu.uci.ics.hyracks.config.NCConfig;
public interface INodeController extends Remote {
public String getId() throws Exception;
-
+
public NCConfig getConfiguration() throws Exception;
public NodeCapability getNodeCapability() throws Exception;
- public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, JobStage stage)
- throws Exception;
+ public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId,
+ Set<ActivityNodeId> activities) throws Exception;
- public void initializeJobletPhase2(UUID jobId, JobPlan plan, JobStage stage,
- Map<PortInstanceId, Endpoint> globalPortMap) throws Exception;
+ public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
+ Map<PortInstanceId, Endpoint> globalPortMap) throws Exception;
- public void commitJobletInitialization(UUID jobId, JobPlan plan, JobStage stage) throws Exception;
+ public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception;
public void cleanUpJob(UUID jobId) throws Exception;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index 9b2fc26..dbeb301 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -22,13 +22,16 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
+import java.util.Vector;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -43,10 +46,14 @@
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
+import edu.uci.ics.hyracks.api.comm.Endpoint;
import edu.uci.ics.hyracks.api.controller.IClusterController;
import edu.uci.ics.hyracks.api.controller.INodeController;
import edu.uci.ics.hyracks.api.controller.NodeParameters;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
@@ -78,8 +85,8 @@
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
if (ccConfig.useJOL) {
- jolRuntime = (Runtime) Runtime.create(Runtime.DEBUG_WATCH, System.err);
- jobManager = new JOLJobManagerImpl(jolRuntime);
+ jolRuntime = (Runtime) Runtime.create(Runtime.DEBUG_ALL, System.err);
+ jobManager = new JOLJobManagerImpl(this, jolRuntime);
} else {
jobManager = new JobManagerImpl(this);
}
@@ -267,4 +274,159 @@
}
}
}
+
+ interface RemoteOp<T> {
+ public String getNodeId();
+
+ public T execute(INodeController node) throws Exception;
+ }
+
+ interface Accumulator<T, R> {
+ public void accumulate(T o);
+
+ public R getResult();
+ }
+
+ <T, R> R runRemote(final RemoteOp<T>[] remoteOps, final Accumulator<T, R> accumulator) throws Exception {
+ final Semaphore installComplete = new Semaphore(remoteOps.length);
+ final List<Exception> errors = new Vector<Exception>();
+ for (final RemoteOp<T> remoteOp : remoteOps) {
+ final INodeController node = lookupNode(remoteOp.getNodeId()).getNodeController();
+
+ installComplete.acquire();
+ Runnable remoteRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ T t = remoteOp.execute(node);
+ if (accumulator != null) {
+ synchronized (accumulator) {
+ accumulator.accumulate(t);
+ }
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ installComplete.release();
+ }
+ }
+ };
+
+ getExecutor().execute(remoteRunner);
+ }
+ installComplete.acquire(remoteOps.length);
+ if (!errors.isEmpty()) {
+ throw errors.get(0);
+ }
+ return accumulator == null ? null : accumulator.getResult();
+ }
+
+ static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
+ private String nodeId;
+ private UUID jobId;
+ private JobPlan plan;
+ private UUID stageId;
+ private Set<ActivityNodeId> tasks;
+
+ public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> tasks) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.plan = plan;
+ this.stageId = stageId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
+ return node.initializeJobletPhase1(jobId, plan, stageId, tasks);
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Distribution Phase 1";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class Phase2Installer implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private JobPlan plan;
+ private UUID stageId;
+ private Set<ActivityNodeId> tasks;
+ private Map<PortInstanceId, Endpoint> globalPortMap;
+
+ public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> tasks,
+ Map<PortInstanceId, Endpoint> globalPortMap) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.plan = plan;
+ this.stageId = stageId;
+ this.tasks = tasks;
+ this.globalPortMap = globalPortMap;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.initializeJobletPhase2(jobId, plan, stageId, tasks, globalPortMap);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Distribution Phase 2";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class Phase3Installer implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+ private UUID stageId;
+
+ public Phase3Installer(String nodeId, UUID jobId, UUID stageId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ this.stageId = stageId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.commitJobletInitialization(jobId, stageId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Distribution Phase 3";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ static class PortMapMergingAccumulator implements
+ Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
+ Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+
+ @Override
+ public void accumulate(Map<PortInstanceId, Endpoint> o) {
+ portMap.putAll(o);
+ }
+
+ @Override
+ public Map<PortInstanceId, Endpoint> getResult() {
+ return portMap;
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 6e7cf74..3c39379 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -16,14 +16,20 @@
import java.util.EnumSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import jol.core.Runtime;
import jol.types.basic.BasicTupleSet;
import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
import jol.types.table.BasicTable;
import jol.types.table.Key;
import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.Direction;
@@ -32,7 +38,9 @@
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
@@ -43,12 +51,16 @@
private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
+ private final ClusterControllerService ccs;
+
private final Runtime jolRuntime;
private final JobTable jobTable;
private final OperatorDescriptorTable odTable;
+ private final OperatorLocationTable olTable;
+
private final ConnectorDescriptorTable cdTable;
private final ActivityNodeTable anTable;
@@ -57,21 +69,84 @@
private final ActivityBlockedTable abTable;
- public JOLJobManagerImpl(Runtime jolRuntime) throws Exception {
+ private final JobStartTable jobStartTable;
+
+ private final StartMessageTable startMessageTable;
+
+ public JOLJobManagerImpl(final ClusterControllerService ccs, Runtime jolRuntime) throws Exception {
+ this.ccs = ccs;
this.jolRuntime = jolRuntime;
this.jobTable = new JobTable(jolRuntime);
this.odTable = new OperatorDescriptorTable(jolRuntime);
+ this.olTable = new OperatorLocationTable(jolRuntime);
this.cdTable = new ConnectorDescriptorTable(jolRuntime);
this.anTable = new ActivityNodeTable(jolRuntime);
this.acTable = new ActivityConnectionTable(jolRuntime);
this.abTable = new ActivityBlockedTable(jolRuntime);
+ this.jobStartTable = new JobStartTable(jolRuntime);
+ this.startMessageTable = new StartMessageTable(jolRuntime);
jolRuntime.catalog().register(jobTable);
jolRuntime.catalog().register(odTable);
+ jolRuntime.catalog().register(olTable);
jolRuntime.catalog().register(cdTable);
jolRuntime.catalog().register(anTable);
jolRuntime.catalog().register(acTable);
jolRuntime.catalog().register(abTable);
+ jolRuntime.catalog().register(jobStartTable);
+ jolRuntime.catalog().register(startMessageTable);
+
+ startMessageTable.register(new StartMessageTable.Callback() {
+ @Override
+ public void deletion(TupleSet tuples) {
+
+ }
+
+ @Override
+ public void insertion(TupleSet tuples) {
+ try {
+ synchronized (JOLJobManagerImpl.this) {
+ for (Tuple t : tuples) {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ JobPlan plan = (JobPlan) data[2];
+ TupleSet ts = (TupleSet) data[3];
+ ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+ .size()];
+ int i = 0;
+ for (Tuple t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
+ plan, stageId, (Set<ActivityNodeId>) t2Data[1]);
+ }
+ Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+ new ClusterControllerService.PortMapMergingAccumulator());
+ ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+ .size()];
+ i = 0;
+ for (Tuple t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ p2is[i++] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
+ plan, stageId, (Set<ActivityNodeId>) t2Data[1], globalPortMap);
+ }
+ ccs.runRemote(p2is, null);
+ ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+ .size()];
+ i = 0;
+ for (Tuple t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ p3is[i++] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+ stageId);
+ }
+ ccs.runRemote(p3is, null);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
jolRuntime.evaluate();
@@ -83,10 +158,11 @@
}
@Override
- public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
final UUID jobId = UUID.randomUUID();
- BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobFlags));
+ final JobPlanBuilder builder = new JobPlanBuilder();
+ builder.init(jobSpec, jobFlags);
final BasicTupleSet anTuples = new BasicTupleSet();
final BasicTupleSet acTuples = new BasicTupleSet();
@@ -95,30 +171,44 @@
@Override
public void addTask(IActivityNode task) {
anTuples.add(ActivityNodeTable.createTuple(jobId, task));
+ builder.addTask(task);
}
@Override
public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
taskOutputIndex));
+ builder.addTargetEdge(operatorOutputIndex, task, taskOutputIndex);
}
@Override
public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
taskInputIndex));
+ builder.addSourceEdge(operatorInputIndex, task, taskInputIndex);
}
@Override
public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
abTuples.add(ActivityBlockedTable.createTuple(jobId, blocker, blocked));
+ builder.addBlockingEdge(blocker, blocked);
}
};
BasicTupleSet odTuples = new BasicTupleSet();
+ BasicTupleSet olTuples = new BasicTupleSet();
for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
IOperatorDescriptor od = e.getValue();
odTuples.add(OperatorDescriptorTable.createTuple(jobId, od));
+ PartitionConstraint pc = od.getPartitionConstraint();
+ LocationConstraint[] locationConstraints = pc.getLocationConstraints();
+ String[] partitions = new String[locationConstraints.length];
+ for (int i = 0; i < locationConstraints.length; ++i) {
+ String nodeId = ((AbsoluteLocationConstraint) locationConstraints[i]).getLocationId();
+ olTuples.add(OperatorLocationTable.createTuple(jobId, od.getOperatorId(), nodeId));
+ partitions[i] = nodeId;
+ }
+ od.setPartitions(partitions);
od.contributeTaskGraph(gBuilder);
}
@@ -127,8 +217,11 @@
cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
}
+ BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobSpec, builder.getPlan()));
+
jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, OperatorLocationTable.TABLE_NAME, olTuples, null);
jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
@@ -155,8 +248,13 @@
}
@Override
- public void start(UUID jobId) throws Exception {
+ public synchronized void start(UUID jobId) throws Exception {
+ BasicTupleSet jsTuples = new BasicTupleSet();
+ jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
+ jolRuntime.schedule(JOL_SCOPE, JobStartTable.TABLE_NAME, jsTuples, null);
+
+ jolRuntime.evaluate();
}
@Override
@@ -165,7 +263,7 @@
}
/*
- * declare(job, keys(0), {JobId, Flags, Status})
+ * declare(job, keys(0), {JobId, Status, JobSpec, JobPlan})
*/
private static class JobTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "job");
@@ -173,15 +271,15 @@
private static Key PRIMARY_KEY = new Key(0);
private static final Class[] SCHEMA = new Class[] {
- UUID.class, EnumSet.class, JobStatus.class, PerJobCounter.class
+ UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class
};
public JobTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
- static Tuple createInitialJobTuple(UUID jobId, EnumSet<JobFlag> jobFlags) {
- return new Tuple(jobId, jobFlags, JobStatus.INITIALIZED, new PerJobCounter());
+ static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
+ return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan);
}
}
@@ -207,6 +305,27 @@
}
/*
+ * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+ */
+ private static class OperatorLocationTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorlocation");
+
+ private static Key PRIMARY_KEY = new Key();
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, String.class
+ };
+
+ public OperatorLocationTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId) {
+ return new Tuple(jobId, opId, nodeId);
+ }
+ }
+
+ /*
* declare(connectordescriptor, keys(0, 1), {JobId, CDId, SrcODId, SrcPort, DestODId, DestPort, ConnectorDescriptor})
*/
private static class ConnectorDescriptorTable extends BasicTable {
@@ -240,7 +359,7 @@
}
/*
- * declare(activitynode, keys(0), {JobId, OperatorId, ActivityId, ActivityNode})
+ * declare(activitynode, keys(0, 1, 2), {JobId, OperatorId, ActivityId, ActivityNode})
*/
private static class ActivityNodeTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activitynode");
@@ -267,7 +386,7 @@
private static class ActivityConnectionTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityconnection");
- private static Key PRIMARY_KEY = new Key(0);
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
private static final Class[] SCHEMA = new Class[] {
UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
@@ -289,10 +408,14 @@
private static class ActivityBlockedTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityblocked");
- private static Key PRIMARY_KEY = new Key(0);
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, ActivityNodeId.class
+ UUID.class,
+ OperatorDescriptorId.class,
+ ActivityNodeId.class,
+ OperatorDescriptorId.class,
+ ActivityNodeId.class
};
public ActivityBlockedTable(Runtime context) {
@@ -300,8 +423,49 @@
}
static Tuple createTuple(UUID jobId, IActivityNode blocker, IActivityNode blocked) {
- OperatorDescriptorId odId = blocker.getActivityNodeId().getOperatorDescriptorId();
- return new Tuple(jobId, odId, blocker.getActivityNodeId(), blocked.getActivityNodeId());
+ ActivityNodeId blockerANId = blocker.getActivityNodeId();
+ OperatorDescriptorId blockerODId = blockerANId.getOperatorDescriptorId();
+ ActivityNodeId blockedANId = blocked.getActivityNodeId();
+ OperatorDescriptorId blockedODId = blockedANId.getOperatorDescriptorId();
+ return new Tuple(jobId, blockerODId, blockerANId, blockedODId, blockedANId);
+ }
+ }
+
+ /*
+ * declare(jobstart, keys(0), {JobId, SubmitTime})
+ */
+ private static class JobStartTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, Long.class
+ };
+
+ public JobStartTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, long submitTime) {
+ return new Tuple(jobId, submitTime);
+ }
+ }
+
+ /*
+ * declare(startmessage, keys(0, 1), {JobId, StageId, JobPlan, TupleSet})
+ */
+ private static class StartMessageTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
+
+ private static Key PRIMARY_KEY = new Key(0, 1);
+
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, JobPlan.class, TupleSet.class
+ };
+
+ public StartMessageTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
index 828a6db..11a7bdd 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
@@ -23,13 +23,10 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.Vector;
-import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.controller.INodeController;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -54,9 +51,8 @@
}
public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- JobPlanBuilder builder = new JobPlanBuilder();
- builder.init(jobSpec, jobFlags);
- JobControl jc = new JobControl(this, builder.plan());
+ JobPlanner planner = new JobPlanner();
+ JobControl jc = new JobControl(this, planner.plan(jobSpec, jobFlags));
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(jc.getJobPlan().toString());
}
@@ -68,8 +64,7 @@
public synchronized void start(UUID jobId) throws Exception {
JobControl jobControlImpl = jobMap.get(jobId);
LOGGER
- .info("Starting job: " + jobControlImpl.getJobId() + ", Current status: "
- + jobControlImpl.getJobStatus());
+ .info("Starting job: " + jobControlImpl.getJobId() + ", Current status: " + jobControlImpl.getJobStatus());
if (jobControlImpl.getJobStatus() != JobStatus.INITIALIZED) {
return;
}
@@ -142,143 +137,39 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Deploying: " + stage);
}
+ UUID jobId = jc.getJobId();
+ JobPlan plan = jc.getJobPlan();
+ UUID stageId = stage.getId();
Set<String> participatingNodes = plan(jc, stage);
StageProgress stageProgress = new StageProgress(stage.getId());
stageProgress.addPendingNodes(participatingNodes);
- Map<PortInstanceId, Endpoint> globalPortMap = runRemote(new Phase1Installer(jc, stage),
- new PortMapMergingAccumulator(), participatingNodes);
- runRemote(new Phase2Installer(jc, stage, globalPortMap), null, participatingNodes);
- runRemote(new Phase3Installer(jc, stage), null, participatingNodes);
+ ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[participatingNodes
+ .size()];
+ ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[participatingNodes
+ .size()];
+ ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[participatingNodes
+ .size()];
+ int i = 0;
+ for (String nodeId : participatingNodes) {
+ p1is[i++] = new ClusterControllerService.Phase1Installer(nodeId, jobId, plan, stageId, stage.getTasks());
+ }
+ Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+ new ClusterControllerService.PortMapMergingAccumulator());
+ i = 0;
+ for (String nodeId : participatingNodes) {
+ p2is[i++] = new ClusterControllerService.Phase2Installer(nodeId, jobId, plan, stageId, stage.getTasks(),
+ globalPortMap);
+ }
+ ccs.runRemote(p2is, null);
+ i = 0;
+ for (String nodeId : participatingNodes) {
+ p3is[i++] = new ClusterControllerService.Phase3Installer(nodeId, jobId, stageId);
+ }
+ ccs.runRemote(p3is, null);
jc.setStageProgress(stage.getId(), stageProgress);
return participatingNodes;
}
- private interface RemoteOp<T> {
- public T execute(INodeController node) throws Exception;
- }
-
- private interface Accumulator<T, R> {
- public void accumulate(T o);
-
- public R getResult();
- }
-
- private static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
- private JobControl jc;
- private JobStage stage;
-
- public Phase1Installer(JobControl jc, JobStage stage) {
- this.jc = jc;
- this.stage = stage;
- }
-
- @Override
- public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
- return node.initializeJobletPhase1(jc.getJobId(), jc.getJobPlan(), stage);
- }
-
- @Override
- public String toString() {
- return jc.getJobId() + " Distribution Phase 1";
- }
- }
-
- private static class Phase2Installer implements RemoteOp<Void> {
- private JobControl jc;
- private JobStage stage;
- private Map<PortInstanceId, Endpoint> globalPortMap;
-
- public Phase2Installer(JobControl jc, JobStage stage, Map<PortInstanceId, Endpoint> globalPortMap) {
- this.jc = jc;
- this.stage = stage;
- this.globalPortMap = globalPortMap;
- }
-
- @Override
- public Void execute(INodeController node) throws Exception {
- node.initializeJobletPhase2(jc.getJobId(), jc.getJobPlan(), stage, globalPortMap);
- return null;
- }
-
- @Override
- public String toString() {
- return jc.getJobId() + " Distribution Phase 2";
- }
- }
-
- private static class Phase3Installer implements RemoteOp<Void> {
- private JobControl jc;
- private JobStage stage;
-
- public Phase3Installer(JobControl jc, JobStage stage) {
- this.jc = jc;
- this.stage = stage;
- }
-
- @Override
- public Void execute(INodeController node) throws Exception {
- node.commitJobletInitialization(jc.getJobId(), jc.getJobPlan(), stage);
- return null;
- }
-
- @Override
- public String toString() {
- return jc.getJobId() + " Distribution Phase 3";
- }
- }
-
- private static class PortMapMergingAccumulator implements
- Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
- Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
-
- @Override
- public void accumulate(Map<PortInstanceId, Endpoint> o) {
- portMap.putAll(o);
- }
-
- @Override
- public Map<PortInstanceId, Endpoint> getResult() {
- return portMap;
- }
- }
-
- private <T, R> R runRemote(final RemoteOp<T> remoteOp, final Accumulator<T, R> accumulator,
- Set<String> candidateNodes) throws Exception {
- LOGGER.log(Level.INFO, remoteOp + " : " + candidateNodes);
-
- final Semaphore installComplete = new Semaphore(candidateNodes.size());
- final List<Exception> errors = new Vector<Exception>();
- for (final String nodeId : candidateNodes) {
- final INodeController node = ccs.lookupNode(nodeId).getNodeController();
-
- installComplete.acquire();
- Runnable remoteRunner = new Runnable() {
- @Override
- public void run() {
- try {
- T t = remoteOp.execute(node);
- if (accumulator != null) {
- synchronized (accumulator) {
- accumulator.accumulate(t);
- }
- }
- } catch (Exception e) {
- errors.add(e);
- } finally {
- installComplete.release();
- }
- }
- };
-
- ccs.getExecutor().execute(remoteRunner);
- }
- installComplete.acquire(candidateNodes.size());
- if (!errors.isEmpty()) {
- throw errors.get(0);
- }
- return accumulator == null ? null : accumulator.getResult();
- }
-
private Set<String> plan(JobControl jc, JobStage stage) throws Exception {
LOGGER.log(Level.INFO, String.valueOf(jc.getJobId()) + ": Planning");
Set<String> participatingNodes = planner.plan(jc, stage);
@@ -289,7 +180,7 @@
}
public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
- StageletStatistics statistics) throws Exception {
+ StageletStatistics statistics) throws Exception {
JobControl jc = jobMap.get(jobId);
if (jc != null) {
jc.notifyStageletComplete(stageId, nodeId, statistics);
@@ -312,10 +203,7 @@
return null;
}
+ @Override
public synchronized void notifyNodeFailure(String nodeId) {
- for(Map.Entry<UUID, JobControl> e : jobMap.entrySet()) {
- JobControl jc = e.getValue();
-
- }
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
index 723736c..d748a13 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
@@ -1,22 +1,7 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package edu.uci.ics.hyracks.controller.clustercontroller;
import java.util.ArrayList;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -24,19 +9,11 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.api.util.Pair;
-import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
public class JobPlanBuilder implements IActivityGraphBuilder {
private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
@@ -53,22 +30,22 @@
public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
- + task.getActivityNodeId() + ":" + taskInputIndex);
+ + task.getActivityNodeId() + ":" + taskInputIndex);
}
insertIntoIndexedMap(plan.getTaskInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
insertIntoIndexedMap(plan.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex, task
- .getActivityNodeId());
+ .getActivityNodeId());
}
@Override
public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
- + task.getActivityNodeId() + ":" + taskOutputIndex);
+ + task.getActivityNodeId() + ":" + taskOutputIndex);
}
insertIntoIndexedMap(plan.getTaskOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
insertIntoIndexedMap(plan.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex, task
- .getActivityNodeId());
+ .getActivityNodeId());
}
@Override
@@ -93,98 +70,6 @@
}
}
- private Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobPlan plan, JobSpecification spec, Set<JobStage> eqSets) {
- Map<ActivityNodeId, IActivityNode> activityNodeMap = plan.getActivityNodeMap();
- for (JobStage eqSet : eqSets) {
- for (ActivityNodeId t : eqSet.getTasks()) {
- IOperatorDescriptor owner = activityNodeMap.get(t).getOwner();
- List<Integer> inputList = plan.getTaskInputMap().get(t);
- if (inputList != null) {
- for (Integer idx : inputList) {
- IConnectorDescriptor conn = spec.getInputConnectorDescriptor(owner, idx);
- OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
- int producerOutputIndex = spec.getProducerOutputIndex(conn);
- ActivityNodeId inTask = plan.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
- if (!eqSet.getTasks().contains(inTask)) {
- return new Pair<ActivityNodeId, ActivityNodeId>(t, inTask);
- }
- }
- }
- List<Integer> outputList = plan.getTaskOutputMap().get(t);
- if (outputList != null) {
- for (Integer idx : outputList) {
- IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(owner, idx);
- OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
- int consumerInputIndex = spec.getConsumerInputIndex(conn);
- ActivityNodeId outTask = plan.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
- if (!eqSet.getTasks().contains(outTask)) {
- return new Pair<ActivityNodeId, ActivityNodeId>(t, outTask);
- }
- }
- }
- }
- }
- return null;
- }
-
- private JobStage inferStages() throws Exception {
- JobSpecification spec = plan.getJobSpecification();
-
- /*
- * Build initial equivalence sets map. We create a map such that for
- * each IOperatorTask, t -> { t }
- */
- Map<ActivityNodeId, JobStage> stageMap = new HashMap<ActivityNodeId, JobStage>();
- Set<JobStage> stages = new HashSet<JobStage>();
- for (Set<ActivityNodeId> taskIds : plan.getOperatorTaskMap().values()) {
- for (ActivityNodeId taskId : taskIds) {
- Set<ActivityNodeId> eqSet = new HashSet<ActivityNodeId>();
- eqSet.add(taskId);
- JobStage stage = new JobStage(eqSet);
- stageMap.put(taskId, stage);
- stages.add(stage);
- }
- }
-
- boolean changed = true;
- while (changed) {
- changed = false;
- Pair<ActivityNodeId, ActivityNodeId> pair = findMergePair(plan, spec, stages);
- if (pair != null) {
- merge(stageMap, stages, pair.first, pair.second);
- changed = true;
- }
- }
-
- JobStage endStage = new JobStage(new HashSet<ActivityNodeId>());
- Map<ActivityNodeId, Set<ActivityNodeId>> blocker2BlockedMap = plan.getBlocker2BlockedMap();
- for (JobStage s : stages) {
- endStage.addDependency(s);
- s.addDependent(endStage);
- Set<JobStage> blockedStages = new HashSet<JobStage>();
- for (ActivityNodeId t : s.getTasks()) {
- Set<ActivityNodeId> blockedTasks = blocker2BlockedMap.get(t);
- if (blockedTasks != null) {
- for (ActivityNodeId bt : blockedTasks) {
- blockedStages.add(stageMap.get(bt));
- }
- }
- }
- for (JobStage bs : blockedStages) {
- bs.addDependency(s);
- s.addDependent(bs);
- }
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
- for (JobStage s : stages) {
- LOGGER.info(s.toString());
- }
- LOGGER.info("SID: ENDSTAGE");
- }
- return endStage;
- }
-
public void init(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
plan = new JobPlan(jobSpec, jobFlags);
}
@@ -199,37 +84,7 @@
vList.set(index, value);
}
- private void merge(Map<ActivityNodeId, JobStage> eqSetMap, Set<JobStage> eqSets, ActivityNodeId t1,
- ActivityNodeId t2) {
- JobStage stage1 = eqSetMap.get(t1);
- Set<ActivityNodeId> s1 = stage1.getTasks();
- JobStage stage2 = eqSetMap.get(t2);
- Set<ActivityNodeId> s2 = stage2.getTasks();
-
- Set<ActivityNodeId> mergedSet = new HashSet<ActivityNodeId>();
- mergedSet.addAll(s1);
- mergedSet.addAll(s2);
-
- eqSets.remove(stage1);
- eqSets.remove(stage2);
- JobStage mergedStage = new JobStage(mergedSet);
- eqSets.add(mergedStage);
-
- for (ActivityNodeId t : mergedSet) {
- eqSetMap.put(t, mergedStage);
- }
- }
-
- public JobPlan plan() throws Exception {
- PlanUtils.visit(plan.getJobSpecification(), new IOperatorDescriptorVisitor() {
- @Override
- public void visit(IOperatorDescriptor op) throws Exception {
- op.contributeTaskGraph(JobPlanBuilder.this);
- }
- });
- JobStage endStage = inferStages();
- plan.setEndStage(endStage);
-
+ public JobPlan getPlan() {
return plan;
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
new file mode 100644
index 0000000..f992de1
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStage;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
+import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
+
+public class JobPlanner {
+ private static final Logger LOGGER = Logger.getLogger(JobPlanner.class.getName());
+
+ private Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobPlan plan, JobSpecification spec, Set<JobStage> eqSets) {
+ Map<ActivityNodeId, IActivityNode> activityNodeMap = plan.getActivityNodeMap();
+ for (JobStage eqSet : eqSets) {
+ for (ActivityNodeId t : eqSet.getTasks()) {
+ IOperatorDescriptor owner = activityNodeMap.get(t).getOwner();
+ List<Integer> inputList = plan.getTaskInputMap().get(t);
+ if (inputList != null) {
+ for (Integer idx : inputList) {
+ IConnectorDescriptor conn = spec.getInputConnectorDescriptor(owner, idx);
+ OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
+ int producerOutputIndex = spec.getProducerOutputIndex(conn);
+ ActivityNodeId inTask = plan.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+ if (!eqSet.getTasks().contains(inTask)) {
+ return new Pair<ActivityNodeId, ActivityNodeId>(t, inTask);
+ }
+ }
+ }
+ List<Integer> outputList = plan.getTaskOutputMap().get(t);
+ if (outputList != null) {
+ for (Integer idx : outputList) {
+ IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(owner, idx);
+ OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
+ int consumerInputIndex = spec.getConsumerInputIndex(conn);
+ ActivityNodeId outTask = plan.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+ if (!eqSet.getTasks().contains(outTask)) {
+ return new Pair<ActivityNodeId, ActivityNodeId>(t, outTask);
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private JobStage inferStages(JobPlan plan) throws Exception {
+ JobSpecification spec = plan.getJobSpecification();
+
+ /*
+ * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+ */
+ Map<ActivityNodeId, JobStage> stageMap = new HashMap<ActivityNodeId, JobStage>();
+ Set<JobStage> stages = new HashSet<JobStage>();
+ for (Set<ActivityNodeId> taskIds : plan.getOperatorTaskMap().values()) {
+ for (ActivityNodeId taskId : taskIds) {
+ Set<ActivityNodeId> eqSet = new HashSet<ActivityNodeId>();
+ eqSet.add(taskId);
+ JobStage stage = new JobStage(eqSet);
+ stageMap.put(taskId, stage);
+ stages.add(stage);
+ }
+ }
+
+ boolean changed = true;
+ while (changed) {
+ changed = false;
+ Pair<ActivityNodeId, ActivityNodeId> pair = findMergePair(plan, spec, stages);
+ if (pair != null) {
+ merge(stageMap, stages, pair.first, pair.second);
+ changed = true;
+ }
+ }
+
+ JobStage endStage = new JobStage(new HashSet<ActivityNodeId>());
+ Map<ActivityNodeId, Set<ActivityNodeId>> blocker2BlockedMap = plan.getBlocker2BlockedMap();
+ for (JobStage s : stages) {
+ endStage.addDependency(s);
+ s.addDependent(endStage);
+ Set<JobStage> blockedStages = new HashSet<JobStage>();
+ for (ActivityNodeId t : s.getTasks()) {
+ Set<ActivityNodeId> blockedTasks = blocker2BlockedMap.get(t);
+ if (blockedTasks != null) {
+ for (ActivityNodeId bt : blockedTasks) {
+ blockedStages.add(stageMap.get(bt));
+ }
+ }
+ }
+ for (JobStage bs : blockedStages) {
+ bs.addDependency(s);
+ s.addDependent(bs);
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
+ for (JobStage s : stages) {
+ LOGGER.info(s.toString());
+ }
+ LOGGER.info("SID: ENDSTAGE");
+ }
+ return endStage;
+ }
+
+ private void merge(Map<ActivityNodeId, JobStage> eqSetMap, Set<JobStage> eqSets, ActivityNodeId t1,
+ ActivityNodeId t2) {
+ JobStage stage1 = eqSetMap.get(t1);
+ Set<ActivityNodeId> s1 = stage1.getTasks();
+ JobStage stage2 = eqSetMap.get(t2);
+ Set<ActivityNodeId> s2 = stage2.getTasks();
+
+ Set<ActivityNodeId> mergedSet = new HashSet<ActivityNodeId>();
+ mergedSet.addAll(s1);
+ mergedSet.addAll(s2);
+
+ eqSets.remove(stage1);
+ eqSets.remove(stage2);
+ JobStage mergedStage = new JobStage(mergedSet);
+ eqSets.add(mergedStage);
+
+ for (ActivityNodeId t : mergedSet) {
+ eqSetMap.put(t, mergedStage);
+ }
+ }
+
+ public JobPlan plan(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ final JobPlanBuilder builder = new JobPlanBuilder();
+ builder.init(jobSpec, jobFlags);
+ PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) throws Exception {
+ op.contributeTaskGraph(builder);
+ }
+ });
+ JobPlan plan = builder.getPlan();
+ JobStage endStage = inferStages(plan);
+ plan.setEndStage(endStage);
+
+ return plan;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java
deleted file mode 100644
index 4644daf..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/PerJobCounter.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-public class PerJobCounter {
- private int stageCounter;
-
- public PerJobCounter() {
- }
-
- public int getStageCounterAndIncrement() {
- return stageCounter++;
- }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index f8f6806..cde2ec4 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -54,7 +55,6 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
import edu.uci.ics.hyracks.comm.ConnectionManager;
import edu.uci.ics.hyracks.comm.DemuxDataReceiveListenerFactory;
@@ -150,7 +150,7 @@
Matcher m = pattern.matcher(ipaddrStr);
if (!m.matches()) {
throw new Exception(MessageFormat.format(
- "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+ "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
}
byte[] ipBytes = new byte[4];
ipBytes[0] = (byte) Integer.parseInt(m.group(1));
@@ -161,22 +161,21 @@
}
@Override
- public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, JobStage stage)
- throws Exception {
- LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stage.getId()
- + "]: Initializing Joblet Phase 1");
+ public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId,
+ Set<ActivityNodeId> activities) throws Exception {
+ LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
final Joblet joblet = getLocalJoblet(jobId);
- Stagelet stagelet = new Stagelet(joblet, stage.getId(), id);
- joblet.setStagelet(stage.getId(), stagelet);
+ Stagelet stagelet = new Stagelet(joblet, stageId, id);
+ joblet.setStagelet(stageId, stagelet);
final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
List<Endpoint> endpointList = new ArrayList<Endpoint>();
- for (ActivityNodeId hanId : stage.getTasks()) {
+ for (ActivityNodeId hanId : activities) {
IActivityNode han = plan.getActivityNodeMap().get(hanId);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Initializing " + hanId + " -> " + han);
@@ -200,8 +199,8 @@
endpointList.add(endpoint);
DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx);
connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
- PortInstanceId piId = new PortInstanceId(op.getOperatorId(),
- Direction.INPUT, plan.getTaskInputMap().get(hanId).get(j), i);
+ PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+ .getTaskInputMap().get(hanId).get(j), i);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
}
@@ -221,7 +220,7 @@
}
private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
- final int receiverIndex, JobPlan plan, final Stagelet stagelet) throws HyracksDataException {
+ final int receiverIndex, JobPlan plan, final Stagelet stagelet) throws HyracksDataException {
final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex);
return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
@@ -246,26 +245,25 @@
public void close() throws HyracksDataException {
reader.close();
stagelet.getStatistics().getStatisticsMap().put(
- "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
- String.valueOf(frameCount));
+ "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
+ String.valueOf(frameCount));
}
} : reader;
}
@Override
- public void initializeJobletPhase2(UUID jobId, final JobPlan plan, JobStage stage,
- final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
- LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stage.getId()
- + "]: Initializing Joblet Phase 2");
+ public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
+ final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
+ LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
final Joblet ji = getLocalJoblet(jobId);
- Stagelet si = (Stagelet) ji.getStagelet(stage.getId());
+ Stagelet si = (Stagelet) ji.getStagelet(stageId);
final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
- final Stagelet stagelet = (Stagelet) ji.getStagelet(stage.getId());
+ final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
final JobSpecification spec = plan.getJobSpecification();
- for (ActivityNodeId hanId : stage.getTasks()) {
+ for (ActivityNodeId hanId : activities) {
IActivityNode han = plan.getActivityNodeMap().get(hanId);
IOperatorDescriptor op = han.getOwner();
List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
@@ -282,13 +280,13 @@
@Override
public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
- Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+ Direction.INPUT, spec.getConsumerInputIndex(conn), index);
Endpoint ep = globalPortMap.get(piId);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
}
return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
- .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+ .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
}
};
or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i));
@@ -301,7 +299,7 @@
}
private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
- final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
+ final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
private int frameCount;
@@ -321,16 +319,16 @@
public void close() throws HyracksDataException {
writer.close();
stagelet.getStatistics().getStatisticsMap().put(
- "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
- String.valueOf(frameCount));
+ "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
+ String.valueOf(frameCount));
}
} : writer;
}
@Override
- public void commitJobletInitialization(UUID jobId, JobPlan plan, JobStage stage) throws Exception {
+ public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception {
final Joblet ji = getLocalJoblet(jobId);
- Stagelet si = (Stagelet) ji.getStagelet(stage.getId());
+ Stagelet si = (Stagelet) ji.getStagelet(stageId);
for (Endpoint e : si.getEndpointList()) {
connectionManager.unacceptConnection(e.getEndpointId());
}
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index abba642..dd62758 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -1,50 +1,115 @@
program hyrackscc;
import java.util.UUID;
+import java.util.Set;
+
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.JobPlan;
-define(testOp, keys(0, 1), {UUID, OperatorDescriptorId, IOperatorDescriptor});
-define(testConn, keys(0, 1), {UUID, ConnectorDescriptorId, OperatorDescriptorId, Integer, OperatorDescriptorId, Integer});
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
-testOp(JobId, Opid, Op) :-
- operatordescriptor(JobId, Opid, Op)
+activitystage(JobId, OperatorId, ActivityId, 0) :-
+ activitynode(JobId, OperatorId, ActivityId, _);
+
+activitystage(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+ StageNumber1 == StageNumber2
{
- java.lang.System.err.println(Opid);
+ StageNumber := StageNumber1 + 1;
};
-testConn(JobId, Cid, SOpid, SPort, DOpid, DPort) :-
- connectordescriptor(JobId, Cid, SOpid, SPort, DOpid, DPort, _)
- {
- java.lang.System.err.println(Cid.toString() + " " + SOpid.toString() + ":" + SPort.toString() + " -> " + DOpid.toString() + ":" + DPort.toString());
- };
-
-define(activitystage, keys(0, 1, 2, 3, 4), {UUID, OperatorDescriptorId, ActivityNodeId, OperatorDescriptorId, ActivityNodeId});
-
-activitystage(JobId, OperatorId, ActivityId, OperatorId, ActivityId) :-
- activitynode(JobId, OperatorId, ActivityId, _)
- {
- java.lang.System.err.println("INITIAL: " + JobId.toString() + " " + OperatorId.toString() + " " + ActivityId.toString());
- };
-
-activitystage(JobId, OperatorId2, ActivityId2, OperatorId1, ActivityId1) :-
- activitystage(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2)
- {
- java.lang.System.err.println("CHANGE1: " + JobId.toString() + " " + OperatorId2.toString() + " " + ActivityId2.toString() + " " + OperatorId1.toString() + " " + ActivityId1.toString());
- };
-
-activitystage(JobId, OperatorId1, ActivityId1, OperatorId3, ActivityId3) :-
- activitystage(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
- activitynode(JobId, OperatorId3, ActivityId3, _),
+activitystage(JobId, OperatorId1, ActivityId1, StageNumber) :-
+ activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, _, ActivityId1, _),
activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
- activityconnection(JobId, OperatorId3, Operator3Port, _, ActivityId3, _),
- connectordescriptor(JobId, _, OperatorId2, Operator2Port, OperatorId3, Operator3Port, _)
+ connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+ StageNumber1 != StageNumber2
{
- java.lang.System.err.println("CHANGE2: " + JobId.toString() + " " + OperatorId1.toString() + " " + ActivityId1.toString() + " " + OperatorId3.toString() + " " + ActivityId3.toString());
+ StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
};
-watch(activitynode, a);
-watch(activitystage, a);
+activitystage(JobId, OperatorId2, ActivityId2, StageNumber) :-
+ activitystage(JobId, OperatorId1, ActivityId1, StageNumber1),
+ activitystage(JobId, OperatorId2, ActivityId2, StageNumber2),
+ activityconnection(JobId, OperatorId1, Operator1Port, _, ActivityId1, _),
+ activityconnection(JobId, OperatorId2, Operator2Port, _, ActivityId2, _),
+ connectordescriptor(JobId, _, OperatorId2, Operator2Port, OperatorId1, Operator1Port, _),
+ StageNumber1 != StageNumber2
+ {
+ StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+ };
+
+define(jobstage, keys(0, 1), {UUID, Integer, UUID});
+
+jobstage(JobId, StageNumber, StageId) :-
+ activitystage(JobId, _, _, StageNumber)
+ {
+ StageId := java.util.UUID.randomUUID();
+ };
+
+watch(jobstage, a);
+
+define(stagestart, keys(0), {UUID, Integer});
+define(stagefinish, keys(), {UUID, Integer});
+
+watch(jobstart, i);
+
+stagestart_INITIAL stagestart(JobId, 0) :-
+ jobstart(JobId, _),
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _),
+ notin stagestart(JobId, _);
+
+/*
+update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan),
+ jobstart(JobId, _);
+*/
+
+stagestart_NEXT stagestart(JobId, NextStageNumber) :-
+ stagestart(JobId, StageNumber),
+ stagefinish(StageId, StageNumber)
+ {
+ NextStageNumber := StageNumber + 1;
+ };
+
+watch(stagestart, a);
+watch(stagestart, d);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, UUID, String});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, StageId, NodeId) :-
+ stagestart(JobId, StageNumber),
+ operatordescriptor(JobId, OperatorId, _),
+ activitystage(JobId, OperatorId, ActivityId, StageNumber),
+ jobstage(JobId, StageNumber, StageId),
+ operatorlocation(JobId, OperatorId, NodeId);
+
+watch(activitystart, a);
+
+define(stageletstart, keys(0, 1), {UUID, UUID, JobPlan, String, ActivityNodeId});
+
+stageletstart(JobId, StageId, JobPlan, NodeId, ActivityId) :-
+ activitystart(JobId, _, ActivityId, _, StageId, NodeId),
+ job(JobId, _, _, JobPlan);
+
+watch(stageletstart, a);
+watch(stageletstart, i);
+
+startmessage(JobId, StageId, JobPlan, tupleset<Tuple>) :-
+ stageletstart(JobId, StageId, JobPlan, NodeId, ActivityId)
+ {
+ Tuple := new Tuple([NodeId, ActivityId]);
+ };
+
+watch(startmessage, i);
\ No newline at end of file