Added scheduling and fault recovery. Completely removed Java based JobManager.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@20 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
new file mode 100644
index 0000000..c14345f
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public class ChoiceLocationConstraint extends LocationConstraint {
+ private static final long serialVersionUID = 1L;
+
+ private LocationConstraint[] choices;
+
+ public ChoiceLocationConstraint(LocationConstraint... choices) {
+ this.choices = choices;
+ }
+
+ public LocationConstraint[] getChoices() {
+ return choices;
+ }
+
+ @Override
+ public ConstraintType getConstraintType() {
+ return ConstraintType.CHOICE;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
index e88a1b2..d34b6b2 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
@@ -21,6 +21,7 @@
public enum ConstraintType {
ABSOLUTE,
+ CHOICE
}
public abstract ConstraintType getConstraintType();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
index 81b16d9..6461a05 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
@@ -33,9 +33,20 @@
public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
StageletStatistics statistics) throws Exception;
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
public void nodeHeartbeat(String id) throws Exception;
/*
+ * Client Application Control Methods.
+ */
+ public void createApplication(String appName) throws Exception;
+
+ public void startApplication(String appName) throws Exception;
+
+ public void destroyApplication(String appName) throws Exception;
+
+ /*
* Client Job Control methods.
*/
public UUID createJob(JobSpecification jobSpec) throws Exception;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
index d349edf..264254e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.api.comm.Endpoint;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.job.JobPlan;
import edu.uci.ics.hyracks.config.NCConfig;
@@ -33,10 +34,11 @@
public NodeCapability getNodeCapability() throws Exception;
public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
- Set<ActivityNodeId> activities) throws Exception;
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception;
- public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
- Map<PortInstanceId, Endpoint> globalPortMap) throws Exception;
+ public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Map<ActivityNodeId, Set<Integer>> tasks,
+ Map<OperatorDescriptorId, Set<Integer>> opPartitions, Map<PortInstanceId, Endpoint> globalPortMap)
+ throws Exception;
public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 600fd41..b3feef8 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -50,11 +50,15 @@
* Endpoint writer factory.
* @param index
* ordinal index of the data producer partition.
+ * @param nProducerPartitions
+ * Number of partitions of the producing operator.
+ * @param nConsumerPartitions
+ * Number of partitions of the consuming operator.
* @return data writer.
* @throws Exception
*/
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException;
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
/**
* Factory metod to create the receive side reader that reads data from this connector.
@@ -67,11 +71,15 @@
* Connection Demultiplexer
* @param index
* ordinal index of the data consumer partition
+ * @param nProducerPartitions
+ * Number of partitions of the producing operator.
+ * @param nConsumerPartitions
+ * Number of partitions of the consuming operator.
* @return data reader
* @throws HyracksDataException
*/
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException;
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
/**
* Translate this connector descriptor to JSON.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index 17a0fc5..ad51370 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -65,21 +65,6 @@
public void setPartitionConstraint(PartitionConstraint partitionConstraint);
/**
- * Returns the final partition locations selected for scheduling. These are decided by Hyracks such that they satisfy the partition constraints.
- *
- * @return array indicating number and node ids of the nodes to schedule the operator runtimes.
- */
- public String[] getPartitions();
-
- /**
- * Sets the partition locations.
- *
- * @param partitions
- * node ids to schedule the operators.
- */
- public void setPartitions(String[] partitions);
-
- /**
* Gets the output record descriptor
*
* @return Array of RecordDescriptor, one per output.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
index 081c3f2..3d3652a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
@@ -23,8 +23,8 @@
@Option(name = "-http-port", usage = "Sets the http port for the admin console")
public int httpPort;
- @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in seconds (default: 10)")
- public int heartbeatPeriod = 10;
+ @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
+ public int heartbeatPeriod = 10000;
@Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
public int maxHeartbeatLapsePeriods = 5;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index 4bbd96a..af60240 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -52,6 +52,7 @@
import edu.uci.ics.hyracks.api.controller.INodeController;
import edu.uci.ics.hyracks.api.controller.NodeParameters;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -85,17 +86,12 @@
public ClusterControllerService(CCConfig ccConfig) throws Exception {
this.ccConfig = ccConfig;
nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
- if (ccConfig.useJOL) {
- Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL
- : new HashSet<DebugLevel>();
- jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
- jobManager = new JOLJobManagerImpl(this, jolRuntime);
- } else {
- jobManager = new JobManagerImpl(this);
- }
+ Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
+ jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
+ jobManager = new JOLJobManagerImpl(this, jolRuntime);
taskExecutor = Executors.newCachedThreadPool();
webServer = new WebServer(new Handler[] {
- getAdminConsoleHandler()
+ getAdminConsoleHandler(), getApplicationDataHandler()
});
this.timer = new Timer(true);
}
@@ -107,7 +103,7 @@
registry.rebind(IClusterController.class.getName(), this);
webServer.setPort(ccConfig.httpPort);
webServer.start();
- timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod * 1000);
+ timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
LOGGER.log(Level.INFO, "Started ClusterControllerService");
}
@@ -139,6 +135,7 @@
nodeRegistry.put(id, state);
}
nodeController.notifyRegistration(this);
+ jobManager.registerNode(id);
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
NodeParameters params = new NodeParameters();
params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
@@ -184,6 +181,11 @@
}
@Override
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+ jobManager.notifyStageletFailure(jobId, stageId, attempt, nodeId);
+ }
+
+ @Override
public JobStatus getJobStatus(UUID jobId) throws Exception {
return jobManager.getJobStatus(jobId);
}
@@ -235,6 +237,17 @@
return handler;
}
+ private Handler getApplicationDataHandler() {
+ ContextHandler handler = new ContextHandler("/applications");
+ handler.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request,
+ HttpServletResponse response) throws IOException, ServletException {
+ }
+ });
+ return handler;
+ }
+
@Override
public Map<String, INodeController> getRegistry() throws Exception {
Map<String, INodeController> map = new HashMap<String, INodeController>();
@@ -273,6 +286,9 @@
}
for (String deadNode : deadNodes) {
try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Killing node: " + deadNode);
+ }
killNode(deadNode);
} catch (Exception e) {
e.printStackTrace();
@@ -298,7 +314,8 @@
final Semaphore installComplete = new Semaphore(remoteOps.length);
final List<Exception> errors = new Vector<Exception>();
for (final RemoteOp<T> remoteOp : remoteOps) {
- final INodeController node = lookupNode(remoteOp.getNodeId()).getNodeController();
+ NodeControllerState nodeState = lookupNode(remoteOp.getNodeId());
+ final INodeController node = nodeState.getNodeController();
installComplete.acquire();
Runnable remoteRunner = new Runnable() {
@@ -334,21 +351,23 @@
private JobPlan plan;
private UUID stageId;
private int attempt;
- private Set<ActivityNodeId> tasks;
+ private Map<ActivityNodeId, Set<Integer>> tasks;
+ private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
- Set<ActivityNodeId> tasks) {
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
this.nodeId = nodeId;
this.jobId = jobId;
this.plan = plan;
this.stageId = stageId;
this.attempt = attempt;
this.tasks = tasks;
+ this.opPartitions = opPartitions;
}
@Override
public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
- return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks);
+ return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks, opPartitions);
}
@Override
@@ -367,22 +386,25 @@
private UUID jobId;
private JobPlan plan;
private UUID stageId;
- private Set<ActivityNodeId> tasks;
+ private Map<ActivityNodeId, Set<Integer>> tasks;
+ private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
private Map<PortInstanceId, Endpoint> globalPortMap;
- public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, Set<ActivityNodeId> tasks,
+ public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
Map<PortInstanceId, Endpoint> globalPortMap) {
this.nodeId = nodeId;
this.jobId = jobId;
this.plan = plan;
this.stageId = stageId;
this.tasks = tasks;
+ this.opPartitions = opPartitions;
this.globalPortMap = globalPortMap;
}
@Override
public Void execute(INodeController node) throws Exception {
- node.initializeJobletPhase2(jobId, plan, stageId, tasks, globalPortMap);
+ node.initializeJobletPhase2(jobId, plan, stageId, tasks, opPartitions, globalPortMap);
return null;
}
@@ -481,6 +503,32 @@
}
}
+ static class JobCompleteNotifier implements RemoteOp<Void> {
+ private String nodeId;
+ private UUID jobId;
+
+ public JobCompleteNotifier(String nodeId, UUID jobId) {
+ this.nodeId = nodeId;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public Void execute(INodeController node) throws Exception {
+ node.cleanUpJob(jobId);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return jobId + " Cleaning Up";
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
static class PortMapMergingAccumulator implements
Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
@@ -495,4 +543,19 @@
return portMap;
}
}
+
+ @Override
+ public void createApplication(String appName) throws Exception {
+
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+
+ }
+
+ @Override
+ public void startApplication(String appName) throws Exception {
+
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
index a13dc7e..e364423 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
@@ -28,14 +28,16 @@
public void start(UUID jobId) throws Exception;
- public void advanceJob(JobControl jobControlImpl) throws Exception;
-
public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
StageletStatistics statistics) throws Exception;
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
public JobStatus getJobStatus(UUID jobId);
public JobStatistics waitForCompletion(UUID jobId) throws Exception;
public void notifyNodeFailure(String nodeId) throws Exception;
+
+ public void registerNode(String nodeId) throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java
deleted file mode 100644
index e9bffba..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobPlanner.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.job.JobStage;
-
-public interface IJobPlanner {
- public Set<String> plan(JobControl jobControl, JobStage stage) throws Exception;
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 8917959..4041850 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.controller.clustercontroller;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -32,6 +33,7 @@
import jol.types.table.TableName;
import edu.uci.ics.hyracks.api.comm.Endpoint;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
@@ -74,10 +76,18 @@
private final JobStartTable jobStartTable;
+ private final JobCleanUpTable jobCleanUpTable;
+
+ private final JobCleanUpCompleteTable jobCleanUpCompleteTable;
+
private final StartMessageTable startMessageTable;
private final StageletCompleteTable stageletCompleteTable;
+ private final StageletFailureTable stageletFailureTable;
+
+ private final AvailableNodesTable availableNodesTable;
+
private final FailedNodesTable failedNodesTable;
private final AbortMessageTable abortMessageTable;
@@ -94,8 +104,12 @@
this.acTable = new ActivityConnectionTable(jolRuntime);
this.abTable = new ActivityBlockedTable(jolRuntime);
this.jobStartTable = new JobStartTable();
+ this.jobCleanUpTable = new JobCleanUpTable(jolRuntime);
+ this.jobCleanUpCompleteTable = new JobCleanUpCompleteTable();
this.startMessageTable = new StartMessageTable(jolRuntime);
this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
+ this.stageletFailureTable = new StageletFailureTable(jolRuntime);
+ this.availableNodesTable = new AvailableNodesTable(jolRuntime);
this.failedNodesTable = new FailedNodesTable(jolRuntime);
this.abortMessageTable = new AbortMessageTable(jolRuntime);
this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
@@ -108,8 +122,12 @@
jolRuntime.catalog().register(acTable);
jolRuntime.catalog().register(abTable);
jolRuntime.catalog().register(jobStartTable);
+ jolRuntime.catalog().register(jobCleanUpTable);
+ jolRuntime.catalog().register(jobCleanUpCompleteTable);
jolRuntime.catalog().register(startMessageTable);
jolRuntime.catalog().register(stageletCompleteTable);
+ jolRuntime.catalog().register(stageletFailureTable);
+ jolRuntime.catalog().register(availableNodesTable);
jolRuntime.catalog().register(failedNodesTable);
jolRuntime.catalog().register(abortMessageTable);
jolRuntime.catalog().register(abortNotifyTable);
@@ -144,13 +162,40 @@
Integer attempt = (Integer) data[2];
JobPlan plan = (JobPlan) data[3];
Set<List> ts = (Set<List>) data[4];
+ Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+ if (opParts == null) {
+ opParts = new HashSet<Integer>();
+ opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+ }
+ opParts.add((Integer) lData[1]);
+ }
+ }
ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
.size()];
int i = 0;
for (List t2 : ts) {
Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
- plan, stageId, attempt, (Set<ActivityNodeId>) t2Data[1]);
+ plan, stageId, attempt, tasks, opPartitions);
}
Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
new ClusterControllerService.PortMapMergingAccumulator());
@@ -163,8 +208,20 @@
i = 0;
for (List t2 : ts) {
Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
- stageId, (Set<ActivityNodeId>) t2Data[1], globalPortMap);
+ stageId, tasks, opPartitions, globalPortMap);
p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
stageId);
ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
@@ -181,6 +238,37 @@
}
});
+ jobCleanUpTable.register(new JobCleanUpTable.Callback() {
+ @Override
+ public void deletion(TupleSet tuples) {
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void insertion(TupleSet tuples) {
+ try {
+ synchronized (JOLJobManagerImpl.this) {
+ for (Tuple t : tuples) {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ Set<String> ts = (Set<String>) data[1];
+ ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+ .size()];
+ int i = 0;
+ for (String n : ts) {
+ jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+ }
+ ccs.runRemote(jcns, null);
+ BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable.createTuple(jobId));
+ jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
abortMessageTable.register(new AbortMessageTable.Callback() {
@Override
public void deletion(TupleSet tuples) {
@@ -224,11 +312,6 @@
}
@Override
- public void advanceJob(JobControl jobControlImpl) throws Exception {
-
- }
-
- @Override
public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
final UUID jobId = UUID.randomUUID();
@@ -270,16 +353,13 @@
BasicTupleSet olTuples = new BasicTupleSet();
for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
IOperatorDescriptor od = e.getValue();
- odTuples.add(OperatorDescriptorTable.createTuple(jobId, od));
PartitionConstraint pc = od.getPartitionConstraint();
LocationConstraint[] locationConstraints = pc.getLocationConstraints();
- String[] partitions = new String[locationConstraints.length];
+ int nPartitions = locationConstraints.length;
+ odTuples.add(OperatorDescriptorTable.createTuple(jobId, nPartitions, od));
for (int i = 0; i < locationConstraints.length; ++i) {
- String nodeId = ((AbsoluteLocationConstraint) locationConstraints[i]).getLocationId();
- olTuples.add(OperatorLocationTable.createTuple(jobId, od.getOperatorId(), nodeId));
- partitions[i] = nodeId;
+ addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i]);
}
- od.setPartitions(partitions);
od.contributeTaskGraph(gBuilder);
}
@@ -303,6 +383,21 @@
return jobId;
}
+ private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
+ LocationConstraint locationConstraint) {
+ switch (locationConstraint.getConstraintType()) {
+ case ABSOLUTE:
+ String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
+ olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i));
+ break;
+
+ case CHOICE:
+ for (LocationConstraint lc : ((ChoiceLocationConstraint) locationConstraint).getChoices()) {
+ addLocationConstraintTuple(olTuples, jobId, opId, i, lc);
+ }
+ }
+ }
+
@Override
public JobStatus getJobStatus(UUID jobId) {
synchronized (jobTable) {
@@ -320,6 +415,12 @@
@Override
public void notifyNodeFailure(String nodeId) throws Exception {
+ BasicTupleSet unavailableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, null, unavailableTuples);
+
+ jolRuntime.evaluate();
+
BasicTupleSet failedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, failedTuples, null);
@@ -339,6 +440,17 @@
}
@Override
+ public synchronized void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId)
+ throws Exception {
+ BasicTupleSet sfTuples = new BasicTupleSet();
+ sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
+
+ jolRuntime.schedule(JOL_SCOPE, StageletFailureTable.TABLE_NAME, sfTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
public synchronized void start(UUID jobId) throws Exception {
BasicTupleSet jsTuples = new BasicTupleSet();
jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
@@ -349,6 +461,15 @@
}
@Override
+ public void registerNode(String nodeId) throws Exception {
+ BasicTupleSet availableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+ jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
+
+ jolRuntime.evaluate();
+ }
+
+ @Override
public JobStatistics waitForCompletion(UUID jobId) throws Exception {
synchronized (jobTable) {
Tuple jobTuple = null;
@@ -416,20 +537,20 @@
@SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, IOperatorDescriptor.class
+ UUID.class, OperatorDescriptorId.class, Integer.class, IOperatorDescriptor.class
};
public OperatorDescriptorTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
- static Tuple createTuple(UUID jobId, IOperatorDescriptor od) {
- return new Tuple(jobId, od.getOperatorId(), od);
+ static Tuple createTuple(UUID jobId, int nPartitions, IOperatorDescriptor od) {
+ return new Tuple(jobId, od.getOperatorId(), nPartitions, od);
}
}
/*
- * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+ * declare(operatorlocation, keys(0, 1), {JobId, ODId, NodeId})
*/
private static class OperatorLocationTable extends BasicTable {
private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorlocation");
@@ -438,15 +559,15 @@
@SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, String.class
+ UUID.class, OperatorDescriptorId.class, String.class, Integer.class
};
public OperatorLocationTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
- static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId) {
- return new Tuple(jobId, opId, nodeId);
+ static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition) {
+ return new Tuple(jobId, opId, nodeId, partition);
}
}
@@ -599,6 +720,44 @@
}
/*
+ * declare(jobcleanup, keys(0), {JobId, Set<NodeId>})
+ */
+ private static class JobCleanUpTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanup");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, Set.class
+ };
+
+ public JobCleanUpTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+ }
+
+ /*
+ * declare(jobcleanupcomplete, keys(0), {JobId})
+ */
+ private static class JobCleanUpCompleteTable extends EventTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanupcomplete");
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class
+ };
+
+ public JobCleanUpCompleteTable() {
+ super(TABLE_NAME, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId) {
+ return new Tuple(jobId);
+ }
+ }
+
+ /*
* declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
*/
private static class StageletCompleteTable extends BasicTable {
@@ -622,6 +781,50 @@
}
/*
+ * declare(stageletfailure, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt})
+ */
+ private static class StageletFailureTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletfailure");
+
+ private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, UUID.class, String.class, Integer.class
+ };
+
+ public StageletFailureTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+ return new Tuple(jobId, stageId, nodeId, attempt);
+ }
+ }
+
+ /*
+ * declare(availablenodes, keys(0), {NodeId})
+ */
+ private static class AvailableNodesTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "availablenodes");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ String.class
+ };
+
+ public AvailableNodesTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(String nodeId) {
+ return new Tuple(nodeId);
+ }
+ }
+
+ /*
* declare(failednodes, keys(0), {NodeId})
*/
private static class FailedNodesTable extends BasicTable {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
deleted file mode 100644
index a7e2fc1..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobControl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-import java.rmi.RemoteException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-
-public class JobControl {
- private static final long serialVersionUID = 1L;
-
- private final IJobManager jobManager;
-
- private final JobPlan jobPlan;
-
- private final UUID jobId;
-
- private final Map<UUID, StageProgress> stageProgressMap;
-
- private final Set<UUID> completeStages;
-
- private JobStatus jobStatus;
-
- private JobStatistics jobStatistics;
-
- public JobControl(IJobManager jobManager, JobPlan jobPlan) throws RemoteException {
- this.jobManager = jobManager;
- this.jobPlan = jobPlan;
- jobId = UUID.randomUUID();
- stageProgressMap = new HashMap<UUID, StageProgress>();
- completeStages = new HashSet<UUID>();
- jobStatus = JobStatus.INITIALIZED;
- jobStatistics = new JobStatistics();
- }
-
- public JobPlan getJobPlan() {
- return jobPlan;
- }
-
- public UUID getJobId() {
- return jobId;
- }
-
- public synchronized JobStatus getJobStatus() {
- return jobStatus;
- }
-
- public Set<UUID> getCompletedStages() {
- return completeStages;
- }
-
- public void setStatus(JobStatus status) {
- jobStatus = status;
- }
-
- public StageProgress getStageProgress(int stageId) {
- return stageProgressMap.get(stageId);
- }
-
- public void setStageProgress(UUID stageId, StageProgress stageProgress) {
- stageProgressMap.put(stageId, stageProgress);
- }
-
- public synchronized void notifyStageletComplete(UUID stageId, String nodeId, StageletStatistics ss)
- throws Exception {
- StageProgress stageProgress = stageProgressMap.get(stageId);
- stageProgress.markNodeComplete(nodeId);
- StageStatistics stageStatistics = stageProgress.getStageStatistics();
- stageStatistics.addStageletStatistics(ss);
- if (stageProgress.stageComplete()) {
- jobStatistics.addStageStatistics(stageStatistics);
- stageProgressMap.remove(stageId);
- completeStages.add(stageId);
- jobManager.advanceJob(this);
- }
- }
-
- public synchronized JobStatistics waitForCompletion() throws Exception {
- while (jobStatus != JobStatus.TERMINATED) {
- wait();
- }
- return jobStatistics;
- }
-
- public synchronized void notifyJobComplete() {
- jobStatus = JobStatus.TERMINATED;
- notifyAll();
- }
-
- public JobStatistics getJobStatistics() {
- return jobStatistics;
- }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
deleted file mode 100644
index fc902b2..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobManagerImpl.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-
-public class JobManagerImpl implements IJobManager {
- private static final Logger LOGGER = Logger.getLogger(JobManagerImpl.class.getName());
- private ClusterControllerService ccs;
-
- private final Map<UUID, JobControl> jobMap;
-
- private final IJobPlanner planner;
-
- public JobManagerImpl(ClusterControllerService ccs) {
- this.ccs = ccs;
- jobMap = new HashMap<UUID, JobControl>();
- planner = new NaiveJobPlannerImpl();
- }
-
- public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- JobPlanner planner = new JobPlanner();
- JobControl jc = new JobControl(this, planner.plan(jobSpec, jobFlags));
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(jc.getJobPlan().toString());
- }
- jobMap.put(jc.getJobId(), jc);
-
- return jc.getJobId();
- }
-
- public synchronized void start(UUID jobId) throws Exception {
- JobControl jobControlImpl = jobMap.get(jobId);
- LOGGER
- .info("Starting job: " + jobControlImpl.getJobId() + ", Current status: " + jobControlImpl.getJobStatus());
- if (jobControlImpl.getJobStatus() != JobStatus.INITIALIZED) {
- return;
- }
- jobControlImpl.getJobStatistics().setStartTime(new Date());
- jobControlImpl.setStatus(JobStatus.RUNNING);
- schedule(jobControlImpl);
- }
-
- public synchronized void advanceJob(JobControl jobControlImpl) throws Exception {
- schedule(jobControlImpl);
- }
-
- private void schedule(JobControl jobControlImpl) throws Exception {
- JobPlan plan = jobControlImpl.getJobPlan();
- JobStage endStage = plan.getEndStage();
-
- Set<UUID> completedStages = jobControlImpl.getCompletedStages();
- List<JobStage> runnableStages = new ArrayList<JobStage>();
- findRunnableStages(endStage, runnableStages, completedStages, new HashSet<UUID>());
- if (runnableStages.size() == 1 && runnableStages.get(0).getTasks().isEmpty()) {
- LOGGER.info("Job " + jobControlImpl.getJobId() + " complete");
- jobControlImpl.getJobStatistics().setEndTime(new Date());
- cleanUp(jobControlImpl);
- jobControlImpl.notifyJobComplete();
- } else {
- for (JobStage s : runnableStages) {
- if (s.isStarted()) {
- continue;
- }
- startStage(jobControlImpl, s);
- }
- }
- }
-
- private void cleanUp(JobControl jc) {
- jobMap.remove(jc.getJobId());
- ccs.notifyJobComplete(jc.getJobId());
- }
-
- private void startStage(JobControl jc, JobStage stage) throws Exception {
- stage.setStarted();
- Set<String> candidateNodes = deploy(jc, stage);
- for (String nodeId : candidateNodes) {
- ccs.lookupNode(nodeId).getNodeController().startStage(jc.getJobId(), stage.getId());
- }
- }
-
- private void findRunnableStages(JobStage s, List<JobStage> runnableStages, Set<UUID> completedStages, Set<UUID> seen) {
- boolean runnable = true;
- if (seen.contains(s.getId())) {
- return;
- }
- seen.add(s.getId());
- for (JobStage dep : s.getDependencies()) {
- boolean depComplete = completedStages.contains(dep.getId());
- runnable = runnable && depComplete;
- if (!depComplete) {
- findRunnableStages(dep, runnableStages, completedStages, seen);
- }
- }
- if (runnable) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Runnable stage: " + s);
- }
- runnableStages.add(s);
- }
- }
-
- private Set<String> deploy(JobControl jc, JobStage stage) throws Exception {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Deploying: " + stage);
- }
- UUID jobId = jc.getJobId();
- JobPlan plan = jc.getJobPlan();
- UUID stageId = stage.getId();
- Set<String> participatingNodes = plan(jc, stage);
- StageProgress stageProgress = new StageProgress(stage.getId());
- stageProgress.addPendingNodes(participatingNodes);
- ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[participatingNodes
- .size()];
- ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[participatingNodes
- .size()];
- ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[participatingNodes
- .size()];
- int i = 0;
- for (String nodeId : participatingNodes) {
- p1is[i++] = new ClusterControllerService.Phase1Installer(nodeId, jobId, plan, stageId, 0, stage.getTasks());
- }
- Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
- new ClusterControllerService.PortMapMergingAccumulator());
- i = 0;
- for (String nodeId : participatingNodes) {
- p2is[i++] = new ClusterControllerService.Phase2Installer(nodeId, jobId, plan, stageId, stage.getTasks(),
- globalPortMap);
- }
- ccs.runRemote(p2is, null);
- i = 0;
- for (String nodeId : participatingNodes) {
- p3is[i++] = new ClusterControllerService.Phase3Installer(nodeId, jobId, stageId);
- }
- ccs.runRemote(p3is, null);
- jc.setStageProgress(stage.getId(), stageProgress);
- return participatingNodes;
- }
-
- private Set<String> plan(JobControl jc, JobStage stage) throws Exception {
- LOGGER.log(Level.INFO, String.valueOf(jc.getJobId()) + ": Planning");
- Set<String> participatingNodes = planner.plan(jc, stage);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(stage + " Participating nodes: " + participatingNodes);
- }
- return participatingNodes;
- }
-
- @Override
- public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
- StageletStatistics statistics) throws Exception {
- JobControl jc = jobMap.get(jobId);
- if (jc != null) {
- jc.notifyStageletComplete(stageId, nodeId, statistics);
- }
- }
-
- @Override
- public synchronized JobStatus getJobStatus(UUID jobId) {
- JobControl jc = jobMap.get(jobId);
- return jc.getJobStatus();
- }
-
- @Override
- public JobStatistics waitForCompletion(UUID jobId) throws Exception {
- JobControl jc;
- synchronized (this) {
- jc = jobMap.get(jobId);
- }
- if (jc != null) {
- return jc.waitForCompletion();
- }
- return null;
- }
-
- @Override
- public synchronized void notifyNodeFailure(String nodeId) {
- }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java
deleted file mode 100644
index f3ccfee..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NaiveJobPlannerImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package edu.uci.ics.hyracks.controller.clustercontroller;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
-
-public class NaiveJobPlannerImpl implements IJobPlanner {
- @Override
- public Set<String> plan(JobControl jc, JobStage stage) throws Exception {
- final Set<OperatorDescriptorId> opSet = new HashSet<OperatorDescriptorId>();
- for (ActivityNodeId t : stage.getTasks()) {
- opSet.add(jc.getJobPlan().getActivityNodeMap().get(t).getOwner().getOperatorId());
- }
-
- final Set<String> candidateNodes = new HashSet<String>();
-
- IOperatorDescriptorVisitor visitor = new IOperatorDescriptorVisitor() {
- @Override
- public void visit(IOperatorDescriptor op) throws Exception {
- if (!opSet.contains(op.getOperatorId())) {
- return;
- }
- String[] partitions = op.getPartitions();
- if (partitions == null) {
- PartitionConstraint pc = op.getPartitionConstraint();
- LocationConstraint[] lcs = pc.getLocationConstraints();
- String[] assignment = new String[lcs.length];
- for (int i = 0; i < lcs.length; ++i) {
- String nodeId = ((AbsoluteLocationConstraint) lcs[i]).getLocationId();
- assignment[i] = nodeId;
- }
- op.setPartitions(assignment);
- partitions = assignment;
- }
- for (String p : partitions) {
- candidateNodes.add(p);
- }
- }
- };
-
- PlanUtils.visit(jc.getJobPlan().getJobSpecification(), visitor);
- return candidateNodes;
- }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
index 2269e1f..7f0a44e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
@@ -91,4 +91,13 @@
stageletMap.remove(stageId);
nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
}
+
+ public void notifyStageletFailed(UUID stageId, int attempt) throws Exception {
+ stageletMap.remove(stageId);
+ nodeController.notifyStageFailed(jobId, stageId, attempt);
+ }
+
+ public NodeControllerService getNodeController() {
+ return nodeController;
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index 7b595b7..c202b3d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -49,6 +49,7 @@
import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -111,7 +112,7 @@
this.nodeParameters = cc.registerNode(this);
// Schedule heartbeat generator.
- timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod() * 1000);
+ timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
LOGGER.log(Level.INFO, "Started NodeControllerService");
}
@@ -124,7 +125,7 @@
}
@Override
- public String getId() throws Exception {
+ public String getId() {
return id;
}
@@ -162,7 +163,7 @@
@Override
public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
- Set<ActivityNodeId> activities) throws Exception {
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception {
LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
final Joblet joblet = getLocalJoblet(jobId);
@@ -175,43 +176,43 @@
List<Endpoint> endpointList = new ArrayList<Endpoint>();
- for (ActivityNodeId hanId : activities) {
+ for (ActivityNodeId hanId : tasks.keySet()) {
IActivityNode han = plan.getActivityNodeMap().get(hanId);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Initializing " + hanId + " -> " + han);
}
IOperatorDescriptor op = han.getOwner();
List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
- String[] partitions = op.getPartitions();
- for (int i = 0; i < partitions.length; ++i) {
- String part = partitions[i];
- if (id.equals(part)) {
- IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
- OperatorRunnable or = new OperatorRunnable(ctx, hon);
- stagelet.setOperator(op.getOperatorId(), i, or);
- if (inputs != null) {
- for (int j = 0; j < inputs.size(); ++j) {
- if (j >= 1) {
- throw new IllegalStateException();
- }
- IConnectorDescriptor conn = inputs.get(j);
- Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
- endpointList.add(endpoint);
- DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId,
- stageId);
- connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
- PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
- .getTaskInputMap().get(hanId).get(j), i);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
- }
- portMap.put(piId, endpoint);
- IFrameReader reader = createReader(conn, drlf, i, plan, stagelet);
- or.setFrameReader(reader);
+ for (int i : tasks.get(hanId)) {
+ IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
+ OperatorRunnable or = new OperatorRunnable(ctx, hon);
+ stagelet.setOperator(op.getOperatorId(), i, or);
+ if (inputs != null) {
+ for (int j = 0; j < inputs.size(); ++j) {
+ if (j >= 1) {
+ throw new IllegalStateException();
}
+ IConnectorDescriptor conn = inputs.get(j);
+ OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+ .getOperatorId();
+ OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+ .getOperatorId();
+ Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
+ endpointList.add(endpoint);
+ DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId, stageId);
+ connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
+ PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+ .getTaskInputMap().get(hanId).get(j), i);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
+ }
+ portMap.put(piId, endpoint);
+ IFrameReader reader = createReader(conn, drlf, i, plan, stagelet, opPartitions
+ .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+ or.setFrameReader(reader);
}
- honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
}
+ honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
}
}
@@ -221,8 +222,10 @@
}
private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
- final int receiverIndex, JobPlan plan, final Stagelet stagelet) throws HyracksDataException {
- final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex);
+ final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
+ throws HyracksDataException {
+ final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex, nProducerCount,
+ nConsumerCount);
return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
private int frameCount;
@@ -253,7 +256,8 @@
}
@Override
- public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId, Set<ActivityNodeId> activities,
+ public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
+ Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
final Joblet ji = getLocalJoblet(jobId);
@@ -264,37 +268,38 @@
final JobSpecification spec = plan.getJobSpecification();
- for (ActivityNodeId hanId : activities) {
+ for (ActivityNodeId hanId : tasks.keySet()) {
IActivityNode han = plan.getActivityNodeMap().get(hanId);
IOperatorDescriptor op = han.getOwner();
List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
- String[] partitions = op.getPartitions();
- for (int i = 0; i < partitions.length; ++i) {
- String part = partitions[i];
- if (id.equals(part)) {
- OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
- if (outputs != null) {
- for (int j = 0; j < outputs.size(); ++j) {
- final IConnectorDescriptor conn = outputs.get(j);
- final int senderIndex = i;
- IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
- @Override
- public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
- PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
- Direction.INPUT, spec.getConsumerInputIndex(conn), index);
- Endpoint ep = globalPortMap.get(piId);
- if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
- }
- return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
- .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+ for (int i : tasks.get(hanId)) {
+ OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
+ if (outputs != null) {
+ for (int j = 0; j < outputs.size(); ++j) {
+ final IConnectorDescriptor conn = outputs.get(j);
+ OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+ .getOperatorId();
+ OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+ .getOperatorId();
+ final int senderIndex = i;
+ IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
+ PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
+ Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+ Endpoint ep = globalPortMap.get(piId);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
}
- };
- or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i));
- }
+ return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
+ .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+ }
+ };
+ or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i, opPartitions.get(
+ producerOpId).size(), opPartitions.get(consumerOpId).size()));
}
- stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
}
+ stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
}
}
}
@@ -373,6 +378,10 @@
ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
}
+ public void notifyStageFailed(UUID jobId, UUID stageId, int attempt) throws Exception {
+ ccs.notifyStageletFailure(jobId, stageId, attempt, id);
+ }
+
@Override
public void notifyRegistration(IClusterController ccs) throws Exception {
this.ccs = ccs;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index 8c0b830..73667ed 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -117,11 +117,13 @@
+ opIId.getOperatorId() + ":" + opIId.getPartition());
} catch (Exception e) {
e.printStackTrace();
+ notifyOperatorFailure(opIId);
}
try {
hon.run();
- } finally {
notifyOperatorCompletion(opIId);
+ } catch (Exception e) {
+ notifyOperatorFailure(opIId);
}
}
});
@@ -139,6 +141,15 @@
}
}
+ protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
+ abort();
+ try {
+ joblet.notifyStageletFailed(stageId, attempt);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
private synchronized void waitUntilStarted() throws InterruptedException {
while (!started && !abort) {
wait();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
index 937e0a4..5013812 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
@@ -37,17 +37,16 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
JobSpecification spec = plan.getJobSpecification();
- final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
- final HashDataWriter hashWriter = new HashDataWriter(ctx, consumerPartitionCount, edwFactory, spec
- .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+ final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
+ .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
return hashWriter;
}
@Override
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
index 9f36730..28cd1fa 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
@@ -36,7 +36,7 @@
private final IBinaryComparatorFactory[] comparatorFactories;
public MToNHashPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
- int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
+ int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
super(spec);
this.tpcf = tpcf;
this.sortFields = sortFields;
@@ -45,19 +45,18 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
JobSpecification spec = plan.getJobSpecification();
- final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
- final HashDataWriter hashWriter = new HashDataWriter(ctx, consumerPartitionCount, edwFactory, spec
- .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+ final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
+ .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
return hashWriter;
}
@Override
- public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux, int index)
- throws HyracksDataException {
+ public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for(int i = 0; i < comparatorFactories.length; ++i) {
+ for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
JobSpecification spec = plan.getJobSpecification();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
index f455f01..d3a8af9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
@@ -41,7 +41,7 @@
private final FrameTupleAccessor tupleAccessor;
public RangeDataWriter(HyracksContext ctx, int consumerPartitionCount, IFrameWriter[] epWriters,
- FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
+ FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
this.consumerPartitionCount = consumerPartitionCount;
this.epWriters = epWriters;
this.appenders = appenders;
@@ -106,12 +106,11 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
JobSpecification spec = plan.getJobSpecification();
- final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
- final IFrameWriter[] epWriters = new IFrameWriter[consumerPartitionCount];
- final FrameTupleAppender[] appenders = new FrameTupleAppender[consumerPartitionCount];
- for (int i = 0; i < consumerPartitionCount; ++i) {
+ final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+ final FrameTupleAppender[] appenders = new FrameTupleAppender[nConsumerPartitions];
+ for (int i = 0; i < nConsumerPartitions; ++i) {
try {
epWriters[i] = edwFactory.createFrameWriter(i);
appenders[i] = new FrameTupleAppender(ctx);
@@ -120,14 +119,14 @@
throw new HyracksDataException(e);
}
}
- final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, consumerPartitionCount, epWriters, appenders, spec
- .getConnectorRecordDescriptor(this));
+ final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, nConsumerPartitions, epWriters, appenders, spec
+ .getConnectorRecordDescriptor(this));
return rangeWriter;
}
@Override
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
index 50d804a..324c39f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
@@ -36,11 +36,9 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
- JobSpecification spec = plan.getJobSpecification();
- final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
- final IFrameWriter[] epWriters = new IFrameWriter[consumerPartitionCount];
- for (int i = 0; i < consumerPartitionCount; ++i) {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+ for (int i = 0; i < nConsumerPartitions; ++i) {
epWriters[i] = edwFactory.createFrameWriter(i);
}
return new IFrameWriter() {
@@ -73,7 +71,7 @@
@Override
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
index c46026a..fff3aa6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
@@ -34,13 +34,13 @@
@Override
public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return edwFactory.createFrameWriter(index);
}
@Override
public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
- int index) throws HyracksDataException {
+ int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
index b411020..2e610a9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
@@ -74,16 +74,6 @@
}
@Override
- public String[] getPartitions() {
- return partitions;
- }
-
- @Override
- public void setPartitions(String[] partitions) {
- this.partitions = partitions;
- }
-
- @Override
public RecordDescriptor[] getOutputRecordDescriptors() {
return recordDescriptors;
}
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index ba7d0f7..b120ba88 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -76,54 +76,93 @@
watch(jobstage, a);
+define(jobattempt, keys(), {UUID, Integer});
+
+jobattempt(JobId, 0) :-
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
+ jobstart(JobId, _);
+
+jobattempt(JobId, NextAttempt) :-
+ jobattempt(JobId, Attempt),
+ stagestart(JobId, _, Attempt),
+ abortcomplete(JobId, _, Attempt)
+ {
+ NextAttempt := Attempt + 1;
+ };
+
define(stagestart, keys(), {UUID, Integer, Integer});
-define(stagefinish, keys(0, 1), {UUID, Integer, Set});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
watch(jobstart, i);
-stagestart_INITIAL stagestart(JobId, 0, 0) :-
- jobstart(JobId, _),
- job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
- notin stagestart(JobId, _, _);
+stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
+ jobattempt#insert(JobId, Attempt);
update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
jobstart(JobId, _);
-stagestart_NEXT stagestart(JobId, NextStageNumber, 0) :-
- stagestart(JobId, StageNumber, _),
- stagefinish#insert(StageId, StageNumber, _)
+stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
+ stagestart(JobId, StageNumber, Attempt),
+ stagefinish#insert(StageId, StageNumber, Attempt, _)
{
NextStageNumber := StageNumber + 1;
};
-stagestart_AGAIN stagestart(JobId, StageNumber, NextAttempt) :-
- stagestart(JobId, StageNumber, Attempt),
- abortcomplete(JobId, StageId, Attempt),
- jobstage(JobId, StageNumber, StageId)
- {
- NextAttempt := Attempt + 1;
- };
-
watch(stagestart, a);
watch(stagestart, d);
-define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String});
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, String});
-activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId) :-
- stagestart(JobId, StageNumber, Attempt),
- operatordescriptor(JobId, OperatorId, _),
+operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
+ operatorlocation(JobId, OperatorId, NodeId, Partition),
+ availablenodes(NodeId)
+ {
+ Benefit := NodeId;
+ };
+
+watch(operatorlocationcandidates, a);
+watch(operatorlocationcandidates, i);
+watch(operatorlocationcandidates, d);
+
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, String});
+
+maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
+ operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
+
+watch(maxoperatorlocationbenefit, a);
+watch(maxoperatorlocationbenefit, i);
+watch(maxoperatorlocationbenefit, d);
+
+define(operatorlocationdecision, keys(0, 1, 3), {UUID, OperatorDescriptorId, String, Integer});
+
+watch(operatorlocationdecision, a);
+watch(operatorlocationdecision, i);
+watch(operatorlocationdecision, d);
+
+operatorlocationdecision(JobId, OperatorId, NodeId, Partition) :-
+ operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
+ maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
+ stagestart#insert(JobId, StageNumber, Attempt),
+ operatordescriptor(JobId, OperatorId, _, _),
activitystage(JobId, OperatorId, ActivityId, StageNumber),
jobstage(JobId, StageNumber, StageId),
- operatorlocation(JobId, OperatorId, NodeId);
+ operatorlocationdecision(JobId, OperatorId, NodeId, Partition);
watch(activitystart, a);
define(stageletstart, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
-stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityId>) :-
- activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId),
- job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _);
+stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityInfo>) :-
+ activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition),
+ job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _)
+ {
+ ActivityInfo := [ActivityId, Partition];
+ };
watch(stageletstart, a);
watch(stageletstart, i);
@@ -131,9 +170,11 @@
define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
- stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet)
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
+ availablenodes(NodeId),
+ ActivityInfoSet.size() != 0
{
- Tuple := [NodeId, ActivityIdSet];
+ Tuple := [NodeId, ActivityInfoSet];
};
startmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
@@ -145,18 +186,29 @@
define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
- stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
- failednodes(NodeId);
+ stageletfailure(JobId, StageId, NodeId, Attempt),
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet);
-stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
- stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
- stageletabort(JobId, StageId, _, _, Attempt, _);
+stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
+ stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, _),
+ stageletstart(JobId, StageId, _, NodeIdOther, Attempt, ActivityIdSet),
+ failednodes(NodeId),
+ notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+watch(stageletabort, a);
+watch(stageletabort, i);
+watch(stageletabort, d);
+
+define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageabort(JobId, StageId, Attempt, set<NodeId>) :-
+ stageletabort(JobId, StageId, _, NodeId, Attempt, _);
define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
- notin failednodes(NodeId)
+ availablenodes(NodeId)
{
Tuple := [NodeId, ActivityIdSet];
};
@@ -167,24 +219,33 @@
watch(abortmessage, a);
watch(abortmessage, i);
-define(abortnotify_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer});
-abortnotify_agg(JobId, StageId, Attempt, set<NodeId>) :-
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
abortnotify(JobId, StageId, NodeId, Attempt);
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+ stageletabort(JobId, StageId, _, NodeId, Attempt, _),
+ failednodes(NodeId);
+
+define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :-
+ stageletabortcomplete(JobId, StageId, NodeId, Attempt);
+
define(abortcomplete, keys(), {UUID, UUID, Integer});
abortcomplete(JobId, StageId, Attempt) :-
- abortnotify_agg(JobId, StageId, Attempt, NodeIdSet),
- abortmessage_agg(JobId, StageId, Attempt, _, TSet),
- TSet.size() == NodeIdSet.size();
+ stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1),
+ stageabort(JobId, StageId, Attempt, NodeIdSet2),
+ NodeIdSet1.size() == NodeIdSet2.size();
define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
-stagefinish(JobId, StageNumber, SSet) :-
+stagefinish(JobId, StageNumber, Attempt, SSet) :-
startmessage_agg(JobId, StageId, Attempt, _, TSet),
stageletcomplete_agg(JobId, StageId, Attempt, SSet),
jobstage(JobId, StageNumber, StageId),
@@ -192,6 +253,17 @@
update_job_status_TERMINATED job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
- stagestart#insert(JobId, StageNumber),
- stagefinish(JobId, _, SSet),
- notin jobstage(JobId, StageNumber);
\ No newline at end of file
+ stagestart#insert(JobId, StageNumber, Attempt),
+ stagefinish(JobId, _, Attempt, SSet),
+ notin jobstage(JobId, StageNumber);
+
+define(jobcleanup_agg, {UUID, Set});
+
+jobcleanup_agg(JobId, set<NodeId>) :-
+ stagestart#insert(JobId, StageNumber, Attempt),
+ stagefinish(JobId, _, Attempt, _),
+ operatorlocationdecision(JobId, _, NodeId, Attempt),
+ notin jobstage(JobId, StageNumber);
+
+jobcleanup(JobId, NodeIdSet) :-
+ jobcleanup_agg(JobId, NodeIdSet);
\ No newline at end of file