Refactored cluster controller
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@492 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
index 8584d36..7609951 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -14,7 +14,9 @@
*/
package edu.uci.ics.hyracks.api.context;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
@@ -25,4 +27,12 @@
public TaskAttemptId getTaskAttemptId();
public ICounterContext getCounterContext();
+
+ public Object lookupGlobalVariable(ActivityId producerActivity, int partition, String varName)
+ throws HyracksDataException;
+
+ public Object lookupLocalVariable(ActivityId producerActivity, int partition, String varName)
+ throws HyracksDataException;
+
+ public void setVariable(String name, Object value) throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/naming/MultipartName.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/naming/MultipartName.java
new file mode 100644
index 0000000..7310606
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/naming/MultipartName.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.naming;
+
+import java.util.Arrays;
+
+public final class MultipartName {
+ private Object[] parts;
+
+ public MultipartName(Object... parts) {
+ this.parts = parts;
+ }
+
+ public Object[] getParts() {
+ return parts;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.deepHashCode(parts);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof MultipartName)) {
+ return false;
+ }
+ return Arrays.deepEquals(parts, ((MultipartName) o).parts);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
index 9adf482..5d42ce9 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
@@ -31,7 +31,7 @@
@Override
public String toString() {
- return "TID: " + id;
+ return "ThID: " + id;
}
public int hashCode() {
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
index 1451c69..1185b5e 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -34,6 +34,9 @@
}
public static Object deserialize(byte[] bytes, ClassLoader classLoader) throws IOException, ClassNotFoundException {
+ if (bytes == null) {
+ return null;
+ }
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(classLoader);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableDescriptor.java
new file mode 100644
index 0000000..b13afe4
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableDescriptor.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.workflow.variables;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.naming.MultipartName;
+
+public final class WorkflowVariableDescriptor implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final MultipartName name;
+
+ private final WorkflowVariableValueScope scope;
+
+ public WorkflowVariableDescriptor(MultipartName name, WorkflowVariableValueScope scope) {
+ this.name = name;
+ this.scope = scope;
+ }
+
+ public MultipartName getName() {
+ return name;
+ }
+
+ public WorkflowVariableValueScope getScope() {
+ return scope;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobScheduler.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableValueScope.java
similarity index 70%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobScheduler.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableValueScope.java
index d359731..44ed129 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobScheduler.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/workflow/variables/WorkflowVariableValueScope.java
@@ -12,11 +12,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.scheduler;
+package edu.uci.ics.hyracks.api.workflow.variables;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-
-public interface IJobScheduler {
- public void notifyJobCreation(JobRun run) throws HyracksException;
+public enum WorkflowVariableValueScope {
+ LOCAL,
+ GLOBAL,
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 79d709e..e0c3ee1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -60,8 +60,7 @@
import edu.uci.ics.hyracks.control.cc.job.manager.events.UnregisterNodeEvent;
import edu.uci.ics.hyracks.control.cc.jobqueue.FutureValue;
import edu.uci.ics.hyracks.control.cc.jobqueue.JobQueue;
-import edu.uci.ics.hyracks.control.cc.scheduler.DefaultJobScheduler;
-import edu.uci.ics.hyracks.control.cc.scheduler.IJobScheduler;
+import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -100,7 +99,7 @@
private final JobQueue jobQueue;
- private final IJobScheduler scheduler;
+ private final JobScheduler scheduler;
private final Executor taskExecutor;
@@ -121,7 +120,7 @@
webServer = new WebServer(this);
runMap = new HashMap<UUID, JobRun>();
jobQueue = new JobQueue();
- scheduler = new DefaultJobScheduler(this);
+ scheduler = new JobScheduler(this);
this.timer = new Timer(true);
ccci = new CCClientInterface(this);
ccContext = new ICCContext() {
@@ -165,7 +164,7 @@
return jobQueue;
}
- public IJobScheduler getScheduler() {
+ public JobScheduler getScheduler() {
return scheduler;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index edc8579..be4945d 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -22,9 +22,8 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.cc.scheduler.IActivityClusterStateMachine;
+import edu.uci.ics.hyracks.control.cc.scheduler.ActivityClusterStateMachine;
public class ActivityCluster {
private final JobRun jobRun;
@@ -41,10 +40,12 @@
private Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
- private IActivityClusterStateMachine acsm;
+ private ActivityClusterStateMachine acsm;
private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+ private Set<TaskCluster> inProgressTaskClusters;
+
public ActivityCluster(JobRun jobRun, Set<ActivityId> activities) {
this.jobRun = jobRun;
this.activities = activities;
@@ -52,6 +53,7 @@
dependents = new HashSet<ActivityCluster>();
taskStateMap = new HashMap<ActivityId, Task[]>();
partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
+ inProgressTaskClusters = new HashSet<TaskCluster>();
}
public Set<ActivityId> getActivities() {
@@ -86,11 +88,11 @@
return partitionProducingTaskClusterMap;
}
- public IActivityClusterStateMachine getStateMachine() {
+ public ActivityClusterStateMachine getStateMachine() {
return acsm;
}
- public void setStateMachine(IActivityClusterStateMachine acsm) {
+ public void setStateMachine(ActivityClusterStateMachine acsm) {
this.acsm = acsm;
}
@@ -102,14 +104,6 @@
return jobRun.getJobActivityGraph().getJobSpecification().getMaxAttempts();
}
- public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
- acsm.notifyTaskClusterFailure(tcAttempt, exception);
- }
-
- public void notifyActivityClusterComplete() throws HyracksException {
- jobRun.getStateMachine().notifyActivityClusterComplete(this);
- }
-
public void setConnectorPolicyMap(Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
this.connectorPolicies = connectorPolicies;
}
@@ -118,7 +112,7 @@
return connectorPolicies;
}
- public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
- acsm.notifyNodeFailures(deadNodes);
+ public Set<TaskCluster> getInProgressTaskClusters() {
+ return inProgressTaskClusters;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 40f4ae5..b16b4c1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -25,7 +25,7 @@
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
-import edu.uci.ics.hyracks.control.cc.scheduler.IJobRunStateMachine;
+import edu.uci.ics.hyracks.control.cc.scheduler.JobRunStateMachine;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
public class JobRun implements IJobStatusConditionVariable {
@@ -41,7 +41,7 @@
private final Map<ActivityId, ActivityCluster> activityClusterMap;
- private IJobRunStateMachine jsm;
+ private JobRunStateMachine jsm;
private JobStatus status;
@@ -100,18 +100,18 @@
return profile;
}
- public void setStateMachine(IJobRunStateMachine jsm) {
+ public void setStateMachine(JobRunStateMachine jsm) {
this.jsm = jsm;
}
- public IJobRunStateMachine getStateMachine() {
+ public JobRunStateMachine getStateMachine() {
return jsm;
}
public Map<ActivityId, ActivityCluster> getActivityClusterMap() {
return activityClusterMap;
}
-
+
public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
jsm.notifyNodeFailures(deadNodes);
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
index 1bfb69e..d2c14b2 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package edu.uci.ics.hyracks.control.cc.partitions;
import java.util.ArrayList;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
new file mode 100644
index 0000000..704ab51
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+
+public class ActivityClusterGraphBuilder {
+ private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
+
+ private final JobRun jobRun;
+
+ public ActivityClusterGraphBuilder(JobRun jobRun) {
+ this.jobRun = jobRun;
+ }
+
+ private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, JobSpecification spec,
+ Set<ActivityCluster> eqSets) {
+ Map<ActivityId, IActivity> activityNodeMap = jag.getActivityNodeMap();
+ for (ActivityCluster eqSet : eqSets) {
+ for (ActivityId t : eqSet.getActivities()) {
+ IActivity activity = activityNodeMap.get(t);
+ List<Integer> inputList = jag.getActivityInputMap().get(t);
+ if (inputList != null) {
+ for (Integer idx : inputList) {
+ IConnectorDescriptor conn = spec.getInputConnectorDescriptor(activity.getActivityId()
+ .getOperatorDescriptorId(), idx);
+ OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
+ int producerOutputIndex = spec.getProducerOutputIndex(conn);
+ ActivityId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+ if (!eqSet.getActivities().contains(inTask)) {
+ return new Pair<ActivityId, ActivityId>(t, inTask);
+ }
+ }
+ }
+ List<Integer> outputList = jag.getActivityOutputMap().get(t);
+ if (outputList != null) {
+ for (Integer idx : outputList) {
+ IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(activity.getActivityId()
+ .getOperatorDescriptorId(), idx);
+ OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
+ int consumerInputIndex = spec.getConsumerInputIndex(conn);
+ ActivityId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+ if (!eqSet.getActivities().contains(outTask)) {
+ return new Pair<ActivityId, ActivityId>(t, outTask);
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public ActivityCluster inferStages(JobActivityGraph jag) {
+ JobSpecification spec = jag.getJobSpecification();
+
+ /*
+ * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+ */
+ Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
+ Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
+ for (Set<ActivityId> taskIds : jag.getOperatorActivityMap().values()) {
+ for (ActivityId taskId : taskIds) {
+ Set<ActivityId> eqSet = new HashSet<ActivityId>();
+ eqSet.add(taskId);
+ ActivityCluster stage = new ActivityCluster(jobRun, eqSet);
+ stageMap.put(taskId, stage);
+ stages.add(stage);
+ }
+ }
+
+ boolean changed = true;
+ while (changed) {
+ changed = false;
+ Pair<ActivityId, ActivityId> pair = findMergePair(jag, spec, stages);
+ if (pair != null) {
+ merge(stageMap, stages, pair.first, pair.second);
+ changed = true;
+ }
+ }
+
+ ActivityCluster endStage = new ActivityCluster(jobRun, new HashSet<ActivityId>());
+ Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
+ for (ActivityCluster s : stages) {
+ endStage.addDependency(s);
+ s.addDependent(endStage);
+ Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
+ for (ActivityId t : s.getActivities()) {
+ Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
+ if (blockedTasks != null) {
+ for (ActivityId bt : blockedTasks) {
+ blockedStages.add(stageMap.get(bt));
+ }
+ }
+ }
+ for (ActivityCluster bs : blockedStages) {
+ bs.addDependency(s);
+ s.addDependent(bs);
+ }
+ }
+ jobRun.getActivityClusterMap().putAll(stageMap);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
+ for (ActivityCluster s : stages) {
+ LOGGER.info(s.toString());
+ }
+ LOGGER.info("SID: ENDSTAGE");
+ }
+ return endStage;
+ }
+
+ private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
+ ActivityId t2) {
+ ActivityCluster stage1 = eqSetMap.get(t1);
+ Set<ActivityId> s1 = stage1.getActivities();
+ ActivityCluster stage2 = eqSetMap.get(t2);
+ Set<ActivityId> s2 = stage2.getActivities();
+
+ Set<ActivityId> mergedSet = new HashSet<ActivityId>();
+ mergedSet.addAll(s1);
+ mergedSet.addAll(s2);
+
+ eqSets.remove(stage1);
+ eqSets.remove(stage2);
+ ActivityCluster mergedStage = new ActivityCluster(jobRun, mergedSet);
+ eqSets.add(mergedStage);
+
+ for (ActivityId t : mergedSet) {
+ eqSetMap.put(t, mergedStage);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterStateMachine.java
similarity index 85%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
rename to hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterStateMachine.java
index 8d2040e..47eeb06 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterStateMachine.java
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -31,8 +30,8 @@
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -52,42 +51,29 @@
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
- private static final Logger LOGGER = Logger.getLogger(DefaultActivityClusterStateMachine.class.getName());
+public class ActivityClusterStateMachine {
+ private static final Logger LOGGER = Logger.getLogger(ActivityClusterStateMachine.class.getName());
private final ClusterControllerService ccs;
- private final DefaultJobRunStateMachine jsm;
+ private final JobRunStateMachine jsm;
private final ActivityCluster ac;
- private final PriorityQueue<RankedRunnableTaskCluster> runnableQueue;
-
- private final Set<TaskCluster> inProgressTaskClusters;
-
- public DefaultActivityClusterStateMachine(ClusterControllerService ccs, DefaultJobRunStateMachine jsm,
- ActivityCluster ac) {
+ public ActivityClusterStateMachine(ClusterControllerService ccs, JobRunStateMachine jsm, ActivityCluster ac) {
this.ccs = ccs;
this.jsm = jsm;
this.ac = ac;
- runnableQueue = new PriorityQueue<RankedRunnableTaskCluster>(ac.getTaskClusters().length,
- new Comparator<RankedRunnableTaskCluster>() {
- @Override
- public int compare(RankedRunnableTaskCluster o1, RankedRunnableTaskCluster o2) {
- int cmp = o1.getRank() - o2.getRank();
- return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0);
- }
- });
- inProgressTaskClusters = new HashSet<TaskCluster>();
}
- @Override
public void schedule() throws HyracksException {
startRunnableTaskClusters();
}
private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
throws HyracksException {
+ JobRun jobRun = ac.getJobRun();
+ JobActivityGraph jag = jobRun.getJobActivityGraph();
Task[] tasks = tc.getTasks();
List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
int attempts = tcAttempts.size();
@@ -107,25 +93,31 @@
tcAttempt.setTaskAttempts(taskAttempts);
PartitionConstraintSolver solver = jsm.getSolver();
solver.solve(locationMap.values());
- Map<OperatorDescriptorId, String> operatorLocationAssignmentMap = jsm.getOperatorLocationAssignmentMap();
for (int i = 0; i < tasks.length; ++i) {
Task ts = tasks[i];
TaskId tid = ts.getTaskId();
TaskAttempt taskAttempt = taskAttempts[i];
- String nodeId = operatorLocationAssignmentMap.get(tid.getActivityId().getOperatorDescriptorId());
+ ActivityId aid = tid.getActivityId();
+ Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(aid);
+ String nodeId = null;
+ if (blockers != null) {
+ for (ActivityId blocker : blockers) {
+ nodeId = findLocationOfBlocker(jobRun, jag, new TaskId(blocker, tid.getPartition()));
+ if (nodeId != null) {
+ break;
+ }
+ }
+ }
+ Set<String> liveNodes = ccs.getNodeMap().keySet();
if (nodeId == null) {
LValueConstraintExpression pLocationExpr = locationMap.get(tid);
Object location = solver.getValue(pLocationExpr);
- Set<String> liveNodes = ccs.getNodeMap().keySet();
if (location == null) {
// pick any
nodeId = liveNodes.toArray(new String[liveNodes.size()])[Math.abs(new Random().nextInt())
% liveNodes.size()];
} else if (location instanceof String) {
nodeId = (String) location;
- if (!liveNodes.contains(nodeId)) {
- throw new HyracksException("Node " + nodeId + " not live");
- }
} else if (location instanceof String[]) {
for (String choice : (String[]) location) {
if (liveNodes.contains(choice)) {
@@ -141,7 +133,12 @@
throw new HyracksException("Unknown type of value for " + pLocationExpr + ": " + location + "("
+ location.getClass() + ")");
}
- operatorLocationAssignmentMap.put(tid.getActivityId().getOperatorDescriptorId(), nodeId);
+ }
+ if (nodeId == null) {
+ throw new HyracksException("No satisfiable location found for " + taskAttempt.getTaskAttemptId());
+ }
+ if (!liveNodes.contains(nodeId)) {
+ throw new HyracksException("Node " + nodeId + " not live");
}
taskAttempt.setNodeId(nodeId);
taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
@@ -157,7 +154,25 @@
tcAttempt.initializePendingTaskCounter();
tcAttempts.add(tcAttempt);
tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.RUNNING);
- inProgressTaskClusters.add(tc);
+ ac.getInProgressTaskClusters().add(tc);
+ }
+
+ private String findLocationOfBlocker(JobRun jobRun, JobActivityGraph jag, TaskId tid) {
+ ActivityId blockerAID = tid.getActivityId();
+ ActivityCluster blockerAC = jobRun.getActivityClusterMap().get(blockerAID);
+ Task[] blockerTasks = blockerAC.getTaskMap().get(blockerAID);
+ List<TaskClusterAttempt> tcAttempts = blockerTasks[tid.getPartition()].getTaskCluster().getAttempts();
+ if (tcAttempts == null || tcAttempts.isEmpty()) {
+ return null;
+ }
+ TaskClusterAttempt lastTCA = tcAttempts.get(tcAttempts.size() - 1);
+ for (TaskAttempt ta : lastTCA.getTaskAttempts()) {
+ TaskId blockerTID = ta.getTaskAttemptId().getTaskId();
+ if (tid.equals(blockerTID)) {
+ return ta.getNodeId();
+ }
+ }
+ return null;
}
private TaskClusterAttempt findLastTaskClusterAttempt(TaskCluster tc) {
@@ -168,7 +183,6 @@
return null;
}
- @Override
public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTaskState().getTaskCluster();
@@ -179,7 +193,7 @@
ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
if (lastAttempt.decrementPendingTasksCounter() == 0) {
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
- inProgressTaskClusters.remove(tc);
+ ac.getInProgressTaskClusters().remove(tc);
startRunnableTaskClusters();
}
} else {
@@ -191,9 +205,9 @@
}
private void startRunnableTaskClusters() throws HyracksException {
- findRunnableTaskClusters();
+ PriorityQueue<RankedRunnableTaskCluster> queue = findRunnableTaskClusters();
Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
- for (RankedRunnableTaskCluster rrtc : runnableQueue) {
+ for (RankedRunnableTaskCluster rrtc : queue) {
TaskCluster tc = rrtc.getTaskCluster();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Found runnable TC: " + tc);
@@ -203,12 +217,18 @@
LOGGER.info("Status: " + tcAttempt.getStatus());
}
}
- assignTaskLocations(tc, taskAttemptMap);
+ try {
+ assignTaskLocations(tc, taskAttemptMap);
+ } catch (HyracksException e) {
+ abort();
+ ac.getJobRun().getStateMachine().notifyActivityClusterFailure(ac, e);
+ return;
+ }
}
if (taskAttemptMap.isEmpty()) {
- if (inProgressTaskClusters.isEmpty()) {
- ac.notifyActivityClusterComplete();
+ if (ac.getInProgressTaskClusters().isEmpty()) {
+ jsm.notifyActivityClusterComplete(ac);
}
return;
}
@@ -216,7 +236,7 @@
startTasks(taskAttemptMap);
}
- private void findRunnableTaskClusters() {
+ private PriorityQueue<RankedRunnableTaskCluster> findRunnableTaskClusters() {
TaskCluster[] taskClusters = ac.getTaskClusters();
Map<TaskCluster, Integer> runnabilityRanks = new HashMap<TaskCluster, Integer>();
@@ -227,23 +247,23 @@
}
}
- runnableQueue.clear();
+ PriorityQueue<RankedRunnableTaskCluster> result = new PriorityQueue<RankedRunnableTaskCluster>();
for (Map.Entry<TaskCluster, Integer> e : runnabilityRanks.entrySet()) {
TaskCluster tc = e.getKey();
int rank = e.getValue();
if (rank >= 0 && rank < Integer.MAX_VALUE) {
- runnableQueue.add(new RankedRunnableTaskCluster(rank, tc));
+ result.add(new RankedRunnableTaskCluster(rank, tc));
}
}
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Ranked TCs: " + runnableQueue);
+ LOGGER.info("Ranked TCs: " + result);
}
+ return result;
}
/*
* Runnability rank has the following semantics
- * Rank(Completed TaskCluster) = -2
- * Rank(Running TaskCluster) = -1
+ * Rank(Running TaskCluster || Completed TaskCluster) = -1
* Rank(Runnable TaskCluster depending on completed TaskClusters) = 0
* Rank(Runnable TaskCluster) = max(Rank(Dependent TaskClusters)) + 1
* Rank(Non-schedulable TaskCluster) = MAX_VALUE
@@ -305,7 +325,7 @@
public void run() {
try {
node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
- taskDescriptors, connectorPolicies);
+ taskDescriptors, connectorPolicies, null);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
@@ -352,13 +372,13 @@
});
}
}
+ ac.getInProgressTaskClusters().remove(tcAttempt.getTaskCluster());
TaskCluster tc = tcAttempt.getTaskCluster();
PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
pmm.removeUncommittedPartitions(tc.getProducedPartitions(), abortTaskIds);
pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
}
- @Override
public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTaskState().getTaskCluster();
@@ -370,7 +390,7 @@
abortTaskCluster(lastAttempt);
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
abortDoomedTaskClusters();
- ac.notifyTaskClusterFailure(lastAttempt, exception);
+ notifyTaskClusterFailure(lastAttempt, exception);
} else {
LOGGER.warning("Spurious task failure notification: " + taId + " Current state = " + taStatus);
}
@@ -433,7 +453,6 @@
return doomed;
}
- @Override
public void abort() throws HyracksException {
TaskCluster[] taskClusters = ac.getTaskClusters();
for (TaskCluster tc : taskClusters) {
@@ -448,7 +467,6 @@
}
}
- @Override
public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
TaskCluster tc = tcAttempt.getTaskCluster();
if (tcAttempt.getAttempt() >= ac.getMaxTaskClusterAttempts()) {
@@ -457,11 +475,16 @@
return;
}
Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
- assignTaskLocations(tc, taskAttemptMap);
+ try {
+ assignTaskLocations(tc, taskAttemptMap);
+ } catch (HyracksException e) {
+ abort();
+ ac.getJobRun().getStateMachine().notifyActivityClusterFailure(ac, e);
+ return;
+ }
startTasks(taskAttemptMap);
}
- @Override
public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
for (TaskCluster tc : ac.getTaskClusters()) {
TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
@@ -482,6 +505,9 @@
}
}
abortDoomedTaskClusters();
- startRunnableTaskClusters();
+ }
+
+ public boolean canMakeProgress() {
+ return true;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
deleted file mode 100644
index 16b7f5c..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
-import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
-import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
-import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivity;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
-import edu.uci.ics.hyracks.api.dataflow.connectors.PipelinedConnectorPolicy;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.api.util.Pair;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.cc.job.Task;
-import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
-
-public class DefaultJobRunStateMachine implements IJobRunStateMachine {
- private static final Logger LOGGER = Logger.getLogger(DefaultJobRunStateMachine.class.getName());
-
- private final ClusterControllerService ccs;
-
- private final JobRun jobRun;
-
- private final Map<OperatorDescriptorId, String> operatorLocationAssignmentMap;
-
- private final Set<ActivityCluster> completedClusters;
-
- private final Set<ActivityCluster> inProgressClusters;
-
- private PartitionConstraintSolver solver;
-
- private ActivityCluster rootActivityCluster;
-
- public DefaultJobRunStateMachine(ClusterControllerService ccs, JobRun jobRun) {
- this.ccs = ccs;
- this.jobRun = jobRun;
- this.operatorLocationAssignmentMap = new HashMap<OperatorDescriptorId, String>();
- completedClusters = new HashSet<ActivityCluster>();
- inProgressClusters = new HashSet<ActivityCluster>();
- }
-
- public Map<OperatorDescriptorId, String> getOperatorLocationAssignmentMap() {
- return operatorLocationAssignmentMap;
- }
-
- public PartitionConstraintSolver getSolver() {
- return solver;
- }
-
- private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, JobSpecification spec,
- Set<ActivityCluster> eqSets) {
- Map<ActivityId, IActivity> activityNodeMap = jag.getActivityNodeMap();
- for (ActivityCluster eqSet : eqSets) {
- for (ActivityId t : eqSet.getActivities()) {
- IActivity activity = activityNodeMap.get(t);
- List<Integer> inputList = jag.getActivityInputMap().get(t);
- if (inputList != null) {
- for (Integer idx : inputList) {
- IConnectorDescriptor conn = spec.getInputConnectorDescriptor(activity.getActivityId()
- .getOperatorDescriptorId(), idx);
- OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
- int producerOutputIndex = spec.getProducerOutputIndex(conn);
- ActivityId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
- if (!eqSet.getActivities().contains(inTask)) {
- return new Pair<ActivityId, ActivityId>(t, inTask);
- }
- }
- }
- List<Integer> outputList = jag.getActivityOutputMap().get(t);
- if (outputList != null) {
- for (Integer idx : outputList) {
- IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(activity.getActivityId()
- .getOperatorDescriptorId(), idx);
- OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
- int consumerInputIndex = spec.getConsumerInputIndex(conn);
- ActivityId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
- if (!eqSet.getActivities().contains(outTask)) {
- return new Pair<ActivityId, ActivityId>(t, outTask);
- }
- }
- }
- }
- }
- return null;
- }
-
- private ActivityCluster inferStages(JobActivityGraph jag) {
- JobSpecification spec = jag.getJobSpecification();
-
- /*
- * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
- */
- Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
- Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
- for (Set<ActivityId> taskIds : jag.getOperatorActivityMap().values()) {
- for (ActivityId taskId : taskIds) {
- Set<ActivityId> eqSet = new HashSet<ActivityId>();
- eqSet.add(taskId);
- ActivityCluster stage = new ActivityCluster(jobRun, eqSet);
- stageMap.put(taskId, stage);
- stages.add(stage);
- }
- }
-
- boolean changed = true;
- while (changed) {
- changed = false;
- Pair<ActivityId, ActivityId> pair = findMergePair(jag, spec, stages);
- if (pair != null) {
- merge(stageMap, stages, pair.first, pair.second);
- changed = true;
- }
- }
-
- ActivityCluster endStage = new ActivityCluster(jobRun, new HashSet<ActivityId>());
- Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
- for (ActivityCluster s : stages) {
- endStage.addDependency(s);
- s.addDependent(endStage);
- Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
- for (ActivityId t : s.getActivities()) {
- Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
- if (blockedTasks != null) {
- for (ActivityId bt : blockedTasks) {
- blockedStages.add(stageMap.get(bt));
- }
- }
- }
- for (ActivityCluster bs : blockedStages) {
- bs.addDependency(s);
- s.addDependent(bs);
- }
- }
- jobRun.getActivityClusterMap().putAll(stageMap);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Inferred " + (stages.size() + 1) + " stages");
- for (ActivityCluster s : stages) {
- LOGGER.info(s.toString());
- }
- LOGGER.info("SID: ENDSTAGE");
- }
- return endStage;
- }
-
- private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
- ActivityId t2) {
- ActivityCluster stage1 = eqSetMap.get(t1);
- Set<ActivityId> s1 = stage1.getActivities();
- ActivityCluster stage2 = eqSetMap.get(t2);
- Set<ActivityId> s2 = stage2.getActivities();
-
- Set<ActivityId> mergedSet = new HashSet<ActivityId>();
- mergedSet.addAll(s1);
- mergedSet.addAll(s2);
-
- eqSets.remove(stage1);
- eqSets.remove(stage2);
- ActivityCluster mergedStage = new ActivityCluster(jobRun, mergedSet);
- eqSets.add(mergedStage);
-
- for (ActivityId t : mergedSet) {
- eqSetMap.put(t, mergedStage);
- }
- }
-
- private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
- if (completedClusters.contains(candidate) || frontier.contains(candidate)
- || inProgressClusters.contains(candidate)) {
- return;
- }
- boolean runnable = true;
- for (ActivityCluster s : candidate.getDependencies()) {
- if (!completedClusters.contains(s)) {
- runnable = false;
- findRunnableActivityClusters(frontier, s);
- }
- }
- if (runnable && candidate != rootActivityCluster) {
- frontier.add(candidate);
- }
- }
-
- private void findRunnableActivityClusters(Set<ActivityCluster> frontier) {
- findRunnableActivityClusters(frontier, rootActivityCluster);
- }
-
- @Override
- public void schedule() throws HyracksException {
- try {
- solver = new PartitionConstraintSolver();
- final JobActivityGraph jag = jobRun.getJobActivityGraph();
- final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
- JobSpecification spec = jag.getJobSpecification();
- final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
- final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
- @Override
- public void addConstraint(Constraint constraint) {
- contributedConstraints.add(constraint);
- }
- };
- PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
- @Override
- public void visit(IOperatorDescriptor op) {
- op.contributeSchedulingConstraints(acceptor, jag, appCtx);
- }
- });
- PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
- @Override
- public void visit(IConnectorDescriptor conn) {
- conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
- }
- });
- contributedConstraints.addAll(spec.getUserConstraints());
- solver.addConstraints(contributedConstraints);
-
- rootActivityCluster = inferStages(jag);
- startRunnableActivityClusters();
- } catch (Exception e) {
- e.printStackTrace();
- ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.FAILURE, e));
- throw new HyracksException(e);
- }
- }
-
- private void startRunnableActivityClusters() throws HyracksException {
- Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
- findRunnableActivityClusters(runnableClusters);
- if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
- ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
- return;
- }
- for (ActivityCluster ac : runnableClusters) {
- inProgressClusters.add(ac);
- buildTaskClusters(ac);
- IActivityClusterStateMachine acsm = new DefaultActivityClusterStateMachine(ccs, this, ac);
- ac.setStateMachine(acsm);
- acsm.schedule();
- }
- }
-
- private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
- throws HyracksException {
- Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
- for (ActivityId anId : ac.getActivities()) {
- lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
- }
- solver.solve(lValues);
- Map<OperatorDescriptorId, Integer> nPartMap = new HashMap<OperatorDescriptorId, Integer>();
- for (LValueConstraintExpression lv : lValues) {
- Object value = solver.getValue(lv);
- if (value == null) {
- throw new HyracksException("No value found for " + lv);
- }
- if (!(value instanceof Number)) {
- throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
- + value + ")");
- }
- int nParts = ((Number) value).intValue();
- if (nParts <= 0) {
- throw new HyracksException("Unsatisfiable number of partitions for " + lv + ": " + nParts);
- }
- nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
- }
- Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
- for (ActivityId anId : ac.getActivities()) {
- int nParts = nPartMap.get(anId.getOperatorDescriptorId());
- int[] nInputPartitions = null;
- List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
- if (inputs != null) {
- nInputPartitions = new int[inputs.size()];
- for (int i = 0; i < nInputPartitions.length; ++i) {
- nInputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
- .getProducerActivity(inputs.get(i).getConnectorId()).getOperatorDescriptorId());
- }
- }
- int[] nOutputPartitions = null;
- List<IConnectorDescriptor> outputs = jobRun.getJobActivityGraph().getActivityOutputConnectorDescriptors(
- anId);
- if (outputs != null) {
- nOutputPartitions = new int[outputs.size()];
- for (int i = 0; i < nOutputPartitions.length; ++i) {
- nOutputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
- .getConsumerActivity(outputs.get(i).getConnectorId()).getOperatorDescriptorId());
- }
- }
- ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
- activityPartsMap.put(anId, apd);
- }
- return activityPartsMap;
- }
-
- private void buildTaskClusters(ActivityCluster ac) throws HyracksException {
- Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
-
- Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
- Set<ActivityId> activities = ac.getActivities();
-
- Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
-
- for (ActivityId anId : activities) {
- ActivityPartitionDetails apd = pcMap.get(anId);
- Task[] taskStates = new Task[apd.getPartitionCount()];
- for (int i = 0; i < taskStates.length; ++i) {
- TaskId tid = new TaskId(anId, i);
- taskStates[i] = new Task(tid, apd);
- Set<TaskId> cluster = new HashSet<TaskId>();
- cluster.add(tid);
- taskClusterMap.put(tid, cluster);
- }
- taskStateMap.put(anId, taskStates);
- }
-
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
- ac.setConnectorPolicyMap(connectorPolicies);
-
- Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
- JobActivityGraph jag = jobRun.getJobActivityGraph();
- BitSet targetBitmap = new BitSet();
- for (ActivityId ac1 : activities) {
- Task[] ac1TaskStates = taskStateMap.get(ac1);
- int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
- if (outputConns != null) {
- for (IConnectorDescriptor c : outputConns) {
- ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityId ac2 = jag.getConsumerActivity(cdId);
- Task[] ac2TaskStates = taskStateMap.get(ac2);
- int nConsumers = ac2TaskStates.length;
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
- .getTaskId());
- if (cInfoList == null) {
- cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
- taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
- }
- Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
- for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
- cInfoList.add(new Pair<TaskId, ConnectorDescriptorId>(ac2TaskStates[j].getTaskId(), cdId));
- IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
- if (cPolicy.requiresProducerConsumerCoscheduling()) {
- cluster.add(ac2TaskStates[j].getTaskId());
- }
- }
- }
- }
- }
- }
-
- boolean done = false;
- while (!done) {
- done = true;
- Set<TaskId> set = new HashSet<TaskId>();
- Set<TaskId> oldSet = null;
- for (Map.Entry<TaskId, Set<TaskId>> e : taskClusterMap.entrySet()) {
- set.clear();
- oldSet = e.getValue();
- set.addAll(e.getValue());
- for (TaskId tid : e.getValue()) {
- set.addAll(taskClusterMap.get(tid));
- }
- for (TaskId tid : set) {
- Set<TaskId> targetSet = taskClusterMap.get(tid);
- if (!targetSet.equals(set)) {
- done = false;
- break;
- }
- }
- if (!done) {
- break;
- }
- }
- for (TaskId tid : oldSet) {
- taskClusterMap.put(tid, set);
- }
- }
-
- Set<Set<TaskId>> clusters = new HashSet<Set<TaskId>>(taskClusterMap.values());
- Set<TaskCluster> tcSet = new HashSet<TaskCluster>();
- for (Set<TaskId> cluster : clusters) {
- Set<Task> taskStates = new HashSet<Task>();
- for (TaskId tid : cluster) {
- taskStates.add(taskStateMap.get(tid.getActivityId())[tid.getPartition()]);
- }
- TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
- tcSet.add(tc);
- for (TaskId tid : cluster) {
- taskStateMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
- }
- }
- ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
-
- Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = ac.getPartitionProducingTaskClusterMap();
- for (TaskCluster tc : ac.getTaskClusters()) {
- for (Task ts : tc.getTasks()) {
- TaskId tid = ts.getTaskId();
- List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
- if (cInfoList != null) {
- for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
- Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
- TaskCluster targetTC = targetTS.getTaskCluster();
- if (targetTC != tc) {
- ConnectorDescriptorId cdId = p.second;
- PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(),
- p.first.getPartition());
- tc.getProducedPartitions().add(pid);
- targetTC.getRequiredPartitions().add(pid);
- partitionProducingTaskClusterMap.put(pid, tc);
- }
- }
- }
- }
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Plan for " + ac);
- LOGGER.info("Built " + tcSet.size() + " Task Clusters");
- for (TaskCluster tc : tcSet) {
- LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
- }
- }
- }
-
- private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
- Map<ActivityId, ActivityPartitionDetails> pcMap) {
- JobActivityGraph jag = jobRun.getJobActivityGraph();
- Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
- Set<ActivityId> activities = ac.getActivities();
- Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
- BitSet targetBitmap = new BitSet();
- for (ActivityId ac1 : activities) {
- Task[] ac1TaskStates = taskStateMap.get(ac1);
- int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
- if (outputConns != null) {
- for (IConnectorDescriptor c : outputConns) {
- ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityId ac2 = jag.getConsumerActivity(cdId);
- Task[] ac2TaskStates = taskStateMap.get(ac2);
- int nConsumers = ac2TaskStates.length;
-
- int[] fanouts = new int[nProducers];
- for (int i = 0; i < nProducers; ++i) {
- c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- fanouts[i] = targetBitmap.cardinality();
- }
- IConnectorPolicy cp = assignConnectorPolicy(c, nProducers, nConsumers, fanouts);
- cPolicyMap.put(cdId, cp);
- }
- }
- }
- return cPolicyMap;
- }
-
- private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
- IConnectorPolicyAssignmentPolicy cpap = jobRun.getJobActivityGraph().getJobSpecification()
- .getConnectorPolicyAssignmentPolicy();
- if (cpap != null) {
- return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
- }
- return new PipelinedConnectorPolicy();
- }
-
- @Override
- public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException {
- for (ActivityCluster ac2 : inProgressClusters) {
- abortActivityCluster(ac2);
- }
- jobRun.setStatus(JobStatus.FAILURE, exception);
- }
-
- private void abortActivityCluster(ActivityCluster ac) throws HyracksException {
- ac.getStateMachine().abort();
- }
-
- @Override
- public void notifyActivityClusterComplete(ActivityCluster ac) throws HyracksException {
- completedClusters.add(ac);
- inProgressClusters.remove(ac);
- startRunnableActivityClusters();
- }
-
- @Override
- public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
- jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
- if (!inProgressClusters.isEmpty()) {
- for (ActivityCluster ac : inProgressClusters) {
- ac.notifyNodeFailures(deadNodes);
- }
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
deleted file mode 100644
index c6d85dd..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IActivityClusterStateMachine.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
-import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
-
-public interface IActivityClusterStateMachine {
- public void schedule() throws HyracksException;
-
- public void abort() throws HyracksException;
-
- public void notifyTaskComplete(TaskAttempt ta) throws HyracksException;
-
- public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException;
-
- public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException;
-
- public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException;
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
deleted file mode 100644
index bbf23d5..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/IJobRunStateMachine.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-
-public interface IJobRunStateMachine {
- public void schedule() throws HyracksException;
-
- public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException;
-
- public void notifyActivityClusterComplete(ActivityCluster activityCluster) throws HyracksException;
-
- public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException;
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobRunStateMachine.java
new file mode 100644
index 0000000..ce2770a
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobRunStateMachine.java
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
+import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
+import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
+
+public class JobRunStateMachine {
+ private final ClusterControllerService ccs;
+
+ private final JobRun jobRun;
+
+ private final Set<ActivityCluster> completedClusters;
+
+ private final Set<ActivityCluster> inProgressClusters;
+
+ private PartitionConstraintSolver solver;
+
+ private ActivityCluster rootActivityCluster;
+
+ public JobRunStateMachine(ClusterControllerService ccs, JobRun jobRun) {
+ this.ccs = ccs;
+ this.jobRun = jobRun;
+ completedClusters = new HashSet<ActivityCluster>();
+ inProgressClusters = new HashSet<ActivityCluster>();
+ }
+
+ public PartitionConstraintSolver getSolver() {
+ return solver;
+ }
+
+ private void findRunnableActivityClusters(Set<ActivityCluster> frontier, ActivityCluster candidate) {
+ if (completedClusters.contains(candidate) || frontier.contains(candidate)
+ || inProgressClusters.contains(candidate)) {
+ return;
+ }
+ boolean runnable = true;
+ for (ActivityCluster s : candidate.getDependencies()) {
+ if (!completedClusters.contains(s)) {
+ runnable = false;
+ findRunnableActivityClusters(frontier, s);
+ }
+ }
+ if (runnable && candidate != rootActivityCluster) {
+ frontier.add(candidate);
+ }
+ }
+
+ private void findRunnableActivityClusters(Set<ActivityCluster> frontier) {
+ findRunnableActivityClusters(frontier, rootActivityCluster);
+ }
+
+ public void schedule() throws HyracksException {
+ try {
+ solver = new PartitionConstraintSolver();
+ final JobActivityGraph jag = jobRun.getJobActivityGraph();
+ final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
+ JobSpecification spec = jag.getJobSpecification();
+ final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
+ final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
+ @Override
+ public void addConstraint(Constraint constraint) {
+ contributedConstraints.add(constraint);
+ }
+ };
+ PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) {
+ op.contributeSchedulingConstraints(acceptor, jag, appCtx);
+ }
+ });
+ PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+ @Override
+ public void visit(IConnectorDescriptor conn) {
+ conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
+ }
+ });
+ contributedConstraints.addAll(spec.getUserConstraints());
+ solver.addConstraints(contributedConstraints);
+
+ ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
+ rootActivityCluster = acgb.inferStages(jag);
+ startRunnableActivityClusters();
+ } catch (Exception e) {
+ e.printStackTrace();
+ ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.FAILURE, e));
+ throw new HyracksException(e);
+ }
+ }
+
+ private void startRunnableActivityClusters() throws HyracksException {
+ Set<ActivityCluster> runnableClusters = new HashSet<ActivityCluster>();
+ findRunnableActivityClusters(runnableClusters);
+ if (runnableClusters.isEmpty() && inProgressClusters.isEmpty()) {
+ ccs.getJobQueue().schedule(new JobCleanupEvent(ccs, jobRun.getJobId(), JobStatus.TERMINATED, null));
+ return;
+ }
+ for (ActivityCluster ac : runnableClusters) {
+ inProgressClusters.add(ac);
+ TaskClusterBuilder tcb = new TaskClusterBuilder(jobRun, solver);
+ tcb.buildTaskClusters(ac);
+ ActivityClusterStateMachine acsm = new ActivityClusterStateMachine(ccs, this, ac);
+ ac.setStateMachine(acsm);
+ acsm.schedule();
+ }
+ }
+
+ public void notifyActivityClusterFailure(ActivityCluster ac, Exception exception) throws HyracksException {
+ for (ActivityCluster ac2 : inProgressClusters) {
+ abortActivityCluster(ac2);
+ }
+ jobRun.setStatus(JobStatus.FAILURE, exception);
+ }
+
+ private void abortActivityCluster(ActivityCluster ac) throws HyracksException {
+ ac.getStateMachine().abort();
+ }
+
+ public void notifyActivityClusterComplete(ActivityCluster ac) throws HyracksException {
+ completedClusters.add(ac);
+ inProgressClusters.remove(ac);
+ startRunnableActivityClusters();
+ }
+
+ public void notifyNodeFailures(Set<String> deadNodes) throws HyracksException {
+ jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
+ for (ActivityCluster ac : completedClusters) {
+ ac.getStateMachine().notifyNodeFailures(deadNodes);
+ }
+ for (ActivityCluster ac : inProgressClusters) {
+ ac.getStateMachine().notifyNodeFailures(deadNodes);
+ }
+ for (ActivityCluster ac : inProgressClusters) {
+ ActivityClusterStateMachine acsm = ac.getStateMachine();
+ if (acsm.canMakeProgress()) {
+ acsm.schedule();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobScheduler.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
similarity index 83%
rename from hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobScheduler.java
rename to hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index c71b23b..44aa1b8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobScheduler.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -18,16 +18,15 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
-public class DefaultJobScheduler implements IJobScheduler {
+public class JobScheduler {
private final ClusterControllerService ccs;
- public DefaultJobScheduler(ClusterControllerService ccs) {
+ public JobScheduler(ClusterControllerService ccs) {
this.ccs = ccs;
}
- @Override
public void notifyJobCreation(JobRun run) throws HyracksException {
- IJobRunStateMachine jsm = new DefaultJobRunStateMachine(ccs, run);
+ JobRunStateMachine jsm = new JobRunStateMachine(ccs, run);
run.setStateMachine(jsm);
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
index fa11643..27e9480 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/RankedRunnableTaskCluster.java
@@ -16,7 +16,7 @@
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
-public class RankedRunnableTaskCluster {
+public class RankedRunnableTaskCluster implements Comparable<RankedRunnableTaskCluster> {
private final int rank;
private final TaskCluster taskCluster;
@@ -38,4 +38,10 @@
public String toString() {
return "[" + rank + ":" + taskCluster + "]";
}
+
+ @Override
+ public int compareTo(RankedRunnableTaskCluster o) {
+ int cmp = rank - o.rank;
+ return cmp < 0 ? -1 : (cmp > 0 ? 1 : 0);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java
new file mode 100644
index 0000000..bbcb83d
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/TaskClusterBuilder.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.scheduler;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipelinedConnectorPolicy;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.util.Pair;
+import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.cc.job.Task;
+import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
+
+public class TaskClusterBuilder {
+ private static final Logger LOGGER = Logger.getLogger(TaskClusterBuilder.class.getName());
+
+ private final JobRun jobRun;
+ private final PartitionConstraintSolver solver;
+
+ public TaskClusterBuilder(JobRun jobRun, PartitionConstraintSolver solver) {
+ this.jobRun = jobRun;
+ this.solver = solver;
+ }
+
+ public void buildTaskClusters(ActivityCluster ac) throws HyracksException {
+ Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
+
+ Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+ Set<ActivityId> activities = ac.getActivities();
+
+ Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
+
+ for (ActivityId anId : activities) {
+ ActivityPartitionDetails apd = pcMap.get(anId);
+ Task[] taskStates = new Task[apd.getPartitionCount()];
+ for (int i = 0; i < taskStates.length; ++i) {
+ TaskId tid = new TaskId(anId, i);
+ taskStates[i] = new Task(tid, apd);
+ Set<TaskId> cluster = new HashSet<TaskId>();
+ cluster.add(tid);
+ taskClusterMap.put(tid, cluster);
+ }
+ taskStateMap.put(anId, taskStates);
+ }
+
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
+ ac.setConnectorPolicyMap(connectorPolicies);
+
+ Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
+ JobActivityGraph jag = jobRun.getJobActivityGraph();
+ BitSet targetBitmap = new BitSet();
+ for (ActivityId ac1 : activities) {
+ Task[] ac1TaskStates = taskStateMap.get(ac1);
+ int nProducers = ac1TaskStates.length;
+ List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+ if (outputConns != null) {
+ for (IConnectorDescriptor c : outputConns) {
+ ConnectorDescriptorId cdId = c.getConnectorId();
+ ActivityId ac2 = jag.getConsumerActivity(cdId);
+ Task[] ac2TaskStates = taskStateMap.get(ac2);
+ int nConsumers = ac2TaskStates.length;
+ for (int i = 0; i < nProducers; ++i) {
+ c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
+ .getTaskId());
+ if (cInfoList == null) {
+ cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+ taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
+ }
+ Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
+ for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
+ cInfoList.add(new Pair<TaskId, ConnectorDescriptorId>(ac2TaskStates[j].getTaskId(), cdId));
+ IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
+ if (cPolicy.requiresProducerConsumerCoscheduling()) {
+ cluster.add(ac2TaskStates[j].getTaskId());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ boolean done = false;
+ while (!done) {
+ done = true;
+ Set<TaskId> set = new HashSet<TaskId>();
+ Set<TaskId> oldSet = null;
+ for (Map.Entry<TaskId, Set<TaskId>> e : taskClusterMap.entrySet()) {
+ set.clear();
+ oldSet = e.getValue();
+ set.addAll(e.getValue());
+ for (TaskId tid : e.getValue()) {
+ set.addAll(taskClusterMap.get(tid));
+ }
+ for (TaskId tid : set) {
+ Set<TaskId> targetSet = taskClusterMap.get(tid);
+ if (!targetSet.equals(set)) {
+ done = false;
+ break;
+ }
+ }
+ if (!done) {
+ break;
+ }
+ }
+ for (TaskId tid : oldSet) {
+ taskClusterMap.put(tid, set);
+ }
+ }
+
+ Set<Set<TaskId>> clusters = new HashSet<Set<TaskId>>(taskClusterMap.values());
+ Set<TaskCluster> tcSet = new HashSet<TaskCluster>();
+ for (Set<TaskId> cluster : clusters) {
+ Set<Task> taskStates = new HashSet<Task>();
+ for (TaskId tid : cluster) {
+ taskStates.add(taskStateMap.get(tid.getActivityId())[tid.getPartition()]);
+ }
+ TaskCluster tc = new TaskCluster(ac, taskStates.toArray(new Task[taskStates.size()]));
+ tcSet.add(tc);
+ for (TaskId tid : cluster) {
+ taskStateMap.get(tid.getActivityId())[tid.getPartition()].setTaskCluster(tc);
+ }
+ }
+ ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
+
+ Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = ac.getPartitionProducingTaskClusterMap();
+ for (TaskCluster tc : ac.getTaskClusters()) {
+ for (Task ts : tc.getTasks()) {
+ TaskId tid = ts.getTaskId();
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
+ if (cInfoList != null) {
+ for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
+ Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
+ TaskCluster targetTC = targetTS.getTaskCluster();
+ if (targetTC != tc) {
+ ConnectorDescriptorId cdId = p.second;
+ PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(),
+ p.first.getPartition());
+ tc.getProducedPartitions().add(pid);
+ targetTC.getRequiredPartitions().add(pid);
+ partitionProducingTaskClusterMap.put(pid, tc);
+ }
+ }
+ }
+ }
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Plan for " + ac);
+ LOGGER.info("Built " + tcSet.size() + " Task Clusters");
+ for (TaskCluster tc : tcSet) {
+ LOGGER.info("Tasks: " + Arrays.toString(tc.getTasks()));
+ }
+ }
+ }
+
+ private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
+ Map<ActivityId, ActivityPartitionDetails> pcMap) {
+ JobActivityGraph jag = jobRun.getJobActivityGraph();
+ Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+ Set<ActivityId> activities = ac.getActivities();
+ Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+ BitSet targetBitmap = new BitSet();
+ for (ActivityId ac1 : activities) {
+ Task[] ac1TaskStates = taskStateMap.get(ac1);
+ int nProducers = ac1TaskStates.length;
+ List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+ if (outputConns != null) {
+ for (IConnectorDescriptor c : outputConns) {
+ ConnectorDescriptorId cdId = c.getConnectorId();
+ ActivityId ac2 = jag.getConsumerActivity(cdId);
+ Task[] ac2TaskStates = taskStateMap.get(ac2);
+ int nConsumers = ac2TaskStates.length;
+
+ int[] fanouts = new int[nProducers];
+ for (int i = 0; i < nProducers; ++i) {
+ c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+ fanouts[i] = targetBitmap.cardinality();
+ }
+ IConnectorPolicy cp = assignConnectorPolicy(c, nProducers, nConsumers, fanouts);
+ cPolicyMap.put(cdId, cp);
+ }
+ }
+ }
+ return cPolicyMap;
+ }
+
+ private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
+ IConnectorPolicyAssignmentPolicy cpap = jobRun.getJobActivityGraph().getJobSpecification()
+ .getConnectorPolicyAssignmentPolicy();
+ if (cpap != null) {
+ return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
+ }
+ return new PipelinedConnectorPolicy();
+ }
+
+ private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
+ throws HyracksException {
+ Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
+ for (ActivityId anId : ac.getActivities()) {
+ lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
+ }
+ solver.solve(lValues);
+ Map<OperatorDescriptorId, Integer> nPartMap = new HashMap<OperatorDescriptorId, Integer>();
+ for (LValueConstraintExpression lv : lValues) {
+ Object value = solver.getValue(lv);
+ if (value == null) {
+ throw new HyracksException("No value found for " + lv);
+ }
+ if (!(value instanceof Number)) {
+ throw new HyracksException("Unexpected type of value bound to " + lv + ": " + value.getClass() + "("
+ + value + ")");
+ }
+ int nParts = ((Number) value).intValue();
+ if (nParts <= 0) {
+ throw new HyracksException("Unsatisfiable number of partitions for " + lv + ": " + nParts);
+ }
+ nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
+ }
+ Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
+ for (ActivityId anId : ac.getActivities()) {
+ int nParts = nPartMap.get(anId.getOperatorDescriptorId());
+ int[] nInputPartitions = null;
+ List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
+ if (inputs != null) {
+ nInputPartitions = new int[inputs.size()];
+ for (int i = 0; i < nInputPartitions.length; ++i) {
+ nInputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
+ .getProducerActivity(inputs.get(i).getConnectorId()).getOperatorDescriptorId());
+ }
+ }
+ int[] nOutputPartitions = null;
+ List<IConnectorDescriptor> outputs = jobRun.getJobActivityGraph().getActivityOutputConnectorDescriptors(
+ anId);
+ if (outputs != null) {
+ nOutputPartitions = new int[outputs.size()];
+ for (int i = 0; i < nOutputPartitions.length; ++i) {
+ nOutputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
+ .getConsumerActivity(outputs.get(i).getConnectorId()).getOperatorDescriptorId());
+ }
+ }
+ ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
+ activityPartsMap.put(anId, apd);
+ }
+ return activityPartsMap;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index ca2fad9..3979656 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -36,7 +36,7 @@
public NodeCapability getNodeCapability() throws Exception;
public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception;
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, byte[] ctxVarBytes) throws Exception;
public void abortTasks(UUID jobId, List<TaskAttemptId> tasks) throws Exception;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 8ef0f3e..c932889 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.naming.MultipartName;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
@@ -64,6 +65,8 @@
private final Map<String, Counter> counterMap;
+ private final Map<MultipartName, Object> localVariableMap;
+
private final DefaultDeallocatableRegistry deallocatableRegistry;
private final IWorkspaceFileFactory fileFactory;
@@ -76,6 +79,7 @@
envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
taskMap = new HashMap<TaskAttemptId, Task>();
counterMap = new HashMap<String, Counter>();
+ localVariableMap = new HashMap<MultipartName, Object>();
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
}
@@ -85,7 +89,7 @@
return jobId;
}
- public IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
+ public synchronized IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
if (!envMap.containsKey(opId)) {
envMap.put(opId, new HashMap<Integer, IOperatorEnvironment>());
}
@@ -104,6 +108,17 @@
return taskMap;
}
+ public synchronized Object lookupLocalVariable(MultipartName name) throws HyracksDataException {
+ if (!localVariableMap.containsKey(name)) {
+ throw new HyracksDataException("Unknown variable: " + name);
+ }
+ return localVariableMap.get(name);
+ }
+
+ public synchronized void setLocalVariable(MultipartName name, Object value) {
+ localVariableMap.put(name, value);
+ }
+
private static final class OperatorEnvironmentImpl implements IOperatorEnvironment {
private final String nodeId;
private final Map<String, Object> map;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3223907..e97eede 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -24,6 +24,7 @@
import java.rmi.registry.Registry;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -64,7 +65,9 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.naming.MultipartName;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
@@ -230,10 +233,11 @@
@Override
public void startTasks(String appName, final UUID jobId, byte[] jagBytes,
List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) throws Exception {
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
try {
NCApplicationContext appCtx = applications.get(appName);
final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
+ Map<MultipartName, Object> ctxVarMap = (Map<MultipartName, Object>) appCtx.deserialize(ctxVarBytes);
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@Override
@@ -257,10 +261,10 @@
LOGGER.info("Initializing " + taId + " -> " + han);
}
final int partition = tid.getPartition();
+ Map<MultipartName, Object> inputGlobalVariables = createInputGlobalVariables(ctxVarMap, han);
Task task = new Task(joblet, taId, han.getClass().getName(), executor);
- IOperatorNodePushable operator = han.createPushRuntime(task,
- joblet.getEnvironment(han.getActivityId().getOperatorDescriptorId(), partition), rdp,
- partition, td.getPartitionCount());
+ IOperatorEnvironment env = joblet.getEnvironment(tid.getActivityId().getOperatorDescriptorId(), tid.getPartition());
+ IOperatorNodePushable operator = han.createPushRuntime(task, env, rdp, partition, td.getPartitionCount());
List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
@@ -308,6 +312,14 @@
}
}
+ private Map<MultipartName, Object> createInputGlobalVariables(Map<MultipartName, Object> ctxVarMap, IActivity han) {
+ Map<MultipartName, Object> gVars = new HashMap<MultipartName, Object>();
+// for (MultipartName inVar : han.getInputVariables()) {
+// gVars.put(inVar, ctxVarMap.get(inVar));
+// }
+ return gVars;
+ }
+
private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
throws HyracksDataException {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index e787a97..e65f59e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.control.nc;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -25,6 +26,7 @@
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -34,7 +36,9 @@
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.naming.MultipartName;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.api.workflow.variables.WorkflowVariableDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -57,13 +61,20 @@
private final Map<String, Counter> counterMap;
+ private final Map<MultipartName, Object> inputGlobalVariables;
+
+ private final Map<MultipartName, Object> outputVariables;
+
+ private final Map<MultipartName, WorkflowVariableDescriptor> outputVariableDescriptorMap;
+
private IPartitionCollector[] collectors;
private IOperatorNodePushable operator;
private volatile boolean aborted;
- public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName,
+ Executor executor) {
this.joblet = joblet;
this.taskAttemptId = taskId;
this.displayName = displayName;
@@ -71,6 +82,13 @@
fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
deallocatableRegistry = new DefaultDeallocatableRegistry();
counterMap = new HashMap<String, Counter>();
+// this.inputGlobalVariables = inputGlobalVariables;
+ inputGlobalVariables = Collections.emptyMap();
+ outputVariables = new HashMap<MultipartName, Object>();
+ outputVariableDescriptorMap = new HashMap<MultipartName, WorkflowVariableDescriptor>();
+// for (WorkflowVariableDescriptor wvd : outputVariableDescriptors) {
+// outputVariableDescriptorMap.put(wvd.getName(), wvd);
+// }
}
public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
@@ -108,6 +126,28 @@
deallocatableRegistry.registerDeallocatable(deallocatable);
}
+ @Override
+ public Object lookupGlobalVariable(ActivityId producerActivity, int partition, String varName)
+ throws HyracksDataException {
+ MultipartName var = new MultipartName(producerActivity, partition, varName);
+ if (!inputGlobalVariables.containsKey(var)) {
+ throw new HyracksDataException("Unknown Variable: " + var);
+ }
+ return inputGlobalVariables.get(var);
+ }
+
+ @Override
+ public Object lookupLocalVariable(ActivityId producerActivity, int partition, String varName)
+ throws HyracksDataException {
+ return joblet.lookupLocalVariable(new MultipartName(producerActivity, partition, varName));
+ }
+
+ @Override
+ public void setVariable(String name, Object value) {
+ outputVariables.put(new MultipartName(taskAttemptId.getTaskId().getActivityId(), taskAttemptId.getTaskId()
+ .getPartition(), name), value);
+ }
+
public void close() {
deallocatableRegistry.close();
}
@@ -190,6 +230,23 @@
} finally {
operator.deinitialize();
}
+ Map<MultipartName, Object> outputGlobalVariables = new HashMap<MultipartName, Object>();
+ for (Map.Entry<MultipartName, Object> e : outputVariables.entrySet()) {
+ MultipartName varName = e.getKey();
+ WorkflowVariableDescriptor wvd = outputVariableDescriptorMap.get(varName);
+ if (wvd == null) {
+ throw new HyracksDataException("Unknown variable found: " + varName);
+ }
+ switch (wvd.getScope()) {
+ case LOCAL:
+ joblet.setLocalVariable(varName, e.getValue());
+ break;
+
+ case GLOBAL:
+ outputGlobalVariables.put(varName, e.getValue());
+ break;
+ }
+ }
joblet.notifyTaskComplete(this);
} catch (Exception e) {
e.printStackTrace();
@@ -207,7 +264,8 @@
try {
collector.open();
try {
- joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector, PartitionState.STARTED);
+ joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
+ PartitionState.STARTED);
IFrameReader reader = collector.getReader();
reader.open();
try {
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index ca37f37..6df880a 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -381,7 +381,7 @@
(Class<? extends Writable>) oldReader.createKey().getClass(),
(Class<? extends Writable>) oldReader.createValue().getClass());
}
- return createSelfReadingMapper(ctx, env, recordDescriptor, partition);
+ return createSelfReadingMapper(ctx, recordDescriptor, partition);
} else {
return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
recordDescProvider.getInputRecordDescriptor(this.odId, 0));
@@ -391,7 +391,7 @@
}
}
- private IOperatorNodePushable createSelfReadingMapper(final IHyracksTaskContext ctx, IOperatorEnvironment env,
+ private IOperatorNodePushable createSelfReadingMapper(final IHyracksTaskContext ctx,
final RecordDescriptor recordDescriptor, final int partition) {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index efb616b..79d9c8a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -55,7 +55,7 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new BTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, isForward, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index cbab080..83b608f 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -88,4 +89,21 @@
public TaskAttemptId getTaskAttemptId() {
return taskId;
}
+
+ @Override
+ public Object lookupGlobalVariable(ActivityId producerActivity, int partition, String varName)
+ throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public Object lookupLocalVariable(ActivityId producerActivity, int partition, String varName)
+ throws HyracksDataException {
+ return null;
+ }
+
+ @Override
+ public void setVariable(String name, Object value) throws HyracksDataException {
+
+ }
}
\ No newline at end of file