merged hyracks_asterix_stabilization r1652:1654
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1658 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index d86bf0f..899b633 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -42,7 +42,8 @@
*/
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(IDataSource<S> dataSource,
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
- IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
+ throws AlgebricksException;
public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index db74c41..e34e60d 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -23,6 +23,7 @@
public class HyracksClientInterfaceFunctions {
public enum FunctionId {
GET_CLUSTER_CONTROLLER_INFO,
+ GET_CLUSTER_TOPOLOGY,
CREATE_APPLICATION,
START_APPLICATION,
DESTROY_APPLICATION,
@@ -128,12 +129,12 @@
private static final long serialVersionUID = 1L;
private final String appName;
- private final byte[] jobSpec;
+ private final byte[] acggfBytes;
private final EnumSet<JobFlag> jobFlags;
- public StartJobFunction(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) {
+ public StartJobFunction(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
this.appName = appName;
- this.jobSpec = jobSpec;
+ this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
}
@@ -146,8 +147,8 @@
return appName;
}
- public byte[] getJobSpec() {
- return jobSpec;
+ public byte[] getACGGFBytes() {
+ return acggfBytes;
}
public EnumSet<JobFlag> getJobFlags() {
@@ -182,4 +183,13 @@
return FunctionId.GET_NODE_CONTROLLERS_INFO;
}
}
+
+ public static class GetClusterTopologyFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_CLUSTER_TOPOLOGY;
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 7413951..4c06d42 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.RPCInterface;
@@ -68,9 +69,9 @@
}
@Override
- public JobId startJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
- appName, jobSpec, jobFlags);
+ appName, acggfBytes, jobFlags);
return (JobId) rpci.call(ipcHandle, sjf);
}
@@ -86,4 +87,10 @@
HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
}
+
+ @Override
+ public ClusterTopology getClusterTopology() throws Exception {
+ HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = new HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
+ return (ClusterTopology) rpci.call(ipcHandle, gctf);
+ }
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 33d76ec..227524c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -25,11 +25,14 @@
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.DefaultHttpClient;
+import edu.uci.ics.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
import edu.uci.ics.hyracks.ipc.api.RPCInterface;
@@ -100,12 +103,19 @@
@Override
public JobId startJob(String appName, JobSpecification jobSpec) throws Exception {
- return hci.startJob(appName, JavaSerializationUtils.serialize(jobSpec), EnumSet.noneOf(JobFlag.class));
+ return startJob(appName, jobSpec, EnumSet.noneOf(JobFlag.class));
}
@Override
public JobId startJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
- return hci.startJob(appName, JavaSerializationUtils.serialize(jobSpec), jobFlags);
+ JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
+ jobSpec);
+ return startJob(appName, jsacggf, jobFlags);
+ }
+
+ public JobId startJob(String appName, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+ throws Exception {
+ return hci.startJob(appName, JavaSerializationUtils.serialize(acggf), jobFlags);
}
@Override
@@ -117,4 +127,9 @@
public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
return hci.getNodeControllersInfo();
}
+
+ @Override
+ public ClusterTopology getClusterTopology() throws Exception {
+ return hci.getClusterTopology();
+ }
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 72a87c2..bdbb544 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -18,10 +18,12 @@
import java.util.EnumSet;
import java.util.Map;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
/**
* Interface used by clients to communicate with the Hyracks Cluster Controller.
@@ -84,6 +86,20 @@
public JobId startJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
/**
+ * Start the specified Job.
+ *
+ * @param appName
+ * Name of the application
+ * @param acggf
+ * Activity Cluster Graph Generator Factory
+ * @param jobFlags
+ * Flags
+ * @throws Exception
+ */
+ public JobId startJob(String appName, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+ throws Exception;
+
+ /**
* Waits until the specified job has completed, either successfully or has
* encountered a permanent failure.
*
@@ -99,4 +115,12 @@
* @return Map of node name to node information.
*/
public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
+
+ /**
+ * Get the cluster topology
+ *
+ * @return the cluster topology
+ * @throws Exception
+ */
+ public ClusterTopology getClusterTopology() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 43dee05..ef5906e 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
public interface IHyracksClientInterface {
public ClusterControllerInfo getClusterControllerInfo() throws Exception;
@@ -32,9 +33,11 @@
public JobStatus getJobStatus(JobId jobId) throws Exception;
- public JobId startJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+ public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
public void waitForCompletion(JobId jobId) throws Exception;
public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
+
+ public ClusterTopology getClusterTopology() throws Exception;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
new file mode 100644
index 0000000..6c6c211
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONException;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class ActivityClusterGraphBuilder {
+ private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
+
+ public ActivityClusterGraphBuilder() {
+ }
+
+ private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, Set<Set<ActivityId>> eqSets) {
+ for (Set<ActivityId> eqSet : eqSets) {
+ for (ActivityId t : eqSet) {
+ List<IConnectorDescriptor> inputList = jag.getActivityInputMap().get(t);
+ if (inputList != null) {
+ for (IConnectorDescriptor conn : inputList) {
+ ActivityId inTask = jag.getProducerActivity(conn.getConnectorId());
+ if (!eqSet.contains(inTask)) {
+ return Pair.<ActivityId, ActivityId> of(t, inTask);
+ }
+ }
+ }
+ List<IConnectorDescriptor> outputList = jag.getActivityOutputMap().get(t);
+ if (outputList != null) {
+ for (IConnectorDescriptor conn : outputList) {
+ ActivityId outTask = jag.getConsumerActivity(conn.getConnectorId());
+ if (!eqSet.contains(outTask)) {
+ return Pair.<ActivityId, ActivityId> of(t, outTask);
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public ActivityClusterGraph inferActivityClusters(JobId jobId, JobActivityGraph jag) {
+ /*
+ * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+ */
+ Map<ActivityId, Set<ActivityId>> stageMap = new HashMap<ActivityId, Set<ActivityId>>();
+ Set<Set<ActivityId>> stages = new HashSet<Set<ActivityId>>();
+ for (ActivityId taskId : jag.getActivityMap().keySet()) {
+ Set<ActivityId> eqSet = new HashSet<ActivityId>();
+ eqSet.add(taskId);
+ stageMap.put(taskId, eqSet);
+ stages.add(eqSet);
+ }
+
+ boolean changed = true;
+ while (changed) {
+ changed = false;
+ Pair<ActivityId, ActivityId> pair = findMergePair(jag, stages);
+ if (pair != null) {
+ merge(stageMap, stages, pair.getLeft(), pair.getRight());
+ changed = true;
+ }
+ }
+
+ ActivityClusterGraph acg = new ActivityClusterGraph();
+ Map<ActivityId, ActivityCluster> acMap = new HashMap<ActivityId, ActivityCluster>();
+ int acCounter = 0;
+ Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap();
+ List<ActivityCluster> acList = new ArrayList<ActivityCluster>();
+ for (Set<ActivityId> stage : stages) {
+ ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(jobId, acCounter++));
+ acList.add(ac);
+ for (ActivityId aid : stage) {
+ IActivity activity = activityNodeMap.get(aid);
+ ac.addActivity(activity);
+ acMap.put(aid, ac);
+ }
+ }
+
+ for (Set<ActivityId> stage : stages) {
+ for (ActivityId aid : stage) {
+ IActivity activity = activityNodeMap.get(aid);
+ ActivityCluster ac = acMap.get(aid);
+ List<IConnectorDescriptor> aOutputs = jag.getActivityOutputMap().get(aid);
+ if (aOutputs == null || aOutputs.isEmpty()) {
+ ac.addRoot(activity);
+ } else {
+ int nActivityOutputs = aOutputs.size();
+ for (int i = 0; i < nActivityOutputs; ++i) {
+ IConnectorDescriptor conn = aOutputs.get(i);
+ ac.addConnector(conn);
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> pcPair = jag.getConnectorActivityMap()
+ .get(conn.getConnectorId());
+ ac.connect(conn, activity, i, pcPair.getRight().getLeft(), pcPair.getRight().getRight(), jag
+ .getConnectorRecordDescriptorMap().get(conn.getConnectorId()));
+ }
+ }
+ }
+ }
+
+ Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = jag.getBlocked2BlockerMap();
+ for (ActivityCluster s : acList) {
+ Map<ActivityId, Set<ActivityId>> acBlocked2BlockerMap = s.getBlocked2BlockerMap();
+ Set<ActivityCluster> blockerStages = new HashSet<ActivityCluster>();
+ for (ActivityId t : s.getActivityMap().keySet()) {
+ Set<ActivityId> blockerTasks = blocked2BlockerMap.get(t);
+ acBlocked2BlockerMap.put(t, blockerTasks);
+ if (blockerTasks != null) {
+ for (ActivityId bt : blockerTasks) {
+ blockerStages.add(acMap.get(bt));
+ }
+ }
+ }
+ for (ActivityCluster bs : blockerStages) {
+ s.getDependencies().add(bs);
+ }
+ }
+ acg.addActivityClusters(acList);
+
+ if (LOGGER.isLoggable(Level.FINE)) {
+ try {
+ LOGGER.fine(acg.toJSON().toString(2));
+ } catch (JSONException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ return acg;
+ }
+
+ private void merge(Map<ActivityId, Set<ActivityId>> eqSetMap, Set<Set<ActivityId>> eqSets, ActivityId t1,
+ ActivityId t2) {
+ Set<ActivityId> stage1 = eqSetMap.get(t1);
+ Set<ActivityId> stage2 = eqSetMap.get(t2);
+
+ Set<ActivityId> mergedSet = new HashSet<ActivityId>();
+ mergedSet.addAll(stage1);
+ mergedSet.addAll(stage2);
+
+ eqSets.remove(stage1);
+ eqSets.remove(stage2);
+ eqSets.add(mergedSet);
+
+ for (ActivityId t : mergedSet) {
+ eqSetMap.put(t, mergedSet);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IConnectorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
similarity index 94%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IConnectorDescriptorVisitor.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
index 1c8446f..5b508b3 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IConnectorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.client.impl;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
similarity index 94%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
index 840faef..fcdf9f0 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.client.impl;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
similarity index 84%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
index a7b60e4..b4b9b3c 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.client.impl;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -35,14 +35,10 @@
private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorConsumerMap;
- public JobActivityGraphBuilder(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+ public JobActivityGraphBuilder(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
activityOperatorMap = new HashMap<ActivityId, IOperatorDescriptor>();
- jag = new JobActivityGraph(appName, jobFlags);
+ jag = new JobActivityGraph();
this.jobSpec = jobSpec;
- jag.setConnectorPolicyAssignmentPolicy(jobSpec.getConnectorPolicyAssignmentPolicy());
- jag.setGlobalJobDataFactory(jobSpec.getGlobalJobDataFactory());
- jag.setJobletEventListenerFactory(jobSpec.getJobletEventListenerFactory());
- jag.setMaxReattempts(jobSpec.getMaxReattempts());
connectorProducerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
connectorConsumerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
}
@@ -54,15 +50,14 @@
@Override
public void addBlockingEdge(IActivity blocker, IActivity blocked) {
- addToValueSet(jag.getBlocker2BlockedMap(), blocker.getActivityId(), blocked.getActivityId());
addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityId(), blocker.getActivityId());
}
@Override
public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Adding source edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
- + operatorInputIndex + " -> " + task.getActivityId() + ":" + taskInputIndex);
+ LOGGER.finest("Adding source edge: " + task.getActivityId() + ":" + operatorInputIndex + " -> "
+ + task.getActivityId() + ":" + taskInputIndex);
}
IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
IConnectorDescriptor conn = jobSpec.getInputConnectorDescriptor(op, operatorInputIndex);
@@ -73,8 +68,8 @@
@Override
public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Adding target edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
- + operatorOutputIndex + " -> " + task.getActivityId() + ":" + taskOutputIndex);
+ LOGGER.finest("Adding target edge: " + task.getActivityId() + ":" + operatorOutputIndex + " -> "
+ + task.getActivityId() + ":" + taskOutputIndex);
}
IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
IConnectorDescriptor conn = jobSpec.getOutputConnectorDescriptor(op, operatorOutputIndex);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
new file mode 100644
index 0000000..31c3d36
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -0,0 +1,87 @@
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobSpecificationActivityClusterGraphGeneratorFactory implements IActivityClusterGraphGeneratorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final JobSpecification spec;
+
+ public JobSpecificationActivityClusterGraphGeneratorFactory(JobSpecification jobSpec) {
+ this.spec = jobSpec;
+ }
+
+ @Override
+ public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(String appName, JobId jobId,
+ final ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException {
+ final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
+ PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+ @Override
+ public void visit(IConnectorDescriptor conn) throws HyracksException {
+ builder.addConnector(conn);
+ }
+ });
+ PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) {
+ op.contributeActivities(builder);
+ }
+ });
+ builder.finish();
+ final JobActivityGraph jag = builder.getActivityGraph();
+ ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
+
+ final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, jag);
+ acg.setMaxReattempts(spec.getMaxReattempts());
+ acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
+ acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
+ final Set<Constraint> constraints = new HashSet<Constraint>();
+ final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
+ @Override
+ public void addConstraint(Constraint constraint) {
+ constraints.add(constraint);
+ }
+ };
+ PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) {
+ op.contributeSchedulingConstraints(acceptor, ccAppCtx);
+ }
+ });
+ PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+ @Override
+ public void visit(IConnectorDescriptor conn) {
+ conn.contributeSchedulingConstraints(acceptor, acg.getConnectorMap().get(conn.getConnectorId()),
+ ccAppCtx);
+ }
+ });
+ constraints.addAll(spec.getUserConstraints());
+ return new IActivityClusterGraphGenerator() {
+ @Override
+ public ActivityClusterGraph initialize() {
+ return acg;
+ }
+
+ @Override
+ public Set<Constraint> getConstraints() {
+ return constraints;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/PlanUtils.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
similarity index 97%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/PlanUtils.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
index 754094a..30f26c7 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/PlanUtils.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.client.impl;
import java.util.HashSet;
import java.util.Set;
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
index 7466276..1e49fe2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -18,9 +18,12 @@
import java.util.Set;
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
public interface ICCContext {
public ClusterControllerInfo getClusterControllerInfo();
public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception;
+
+ public ClusterTopology getClusterTopology();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 0ac3658..3a6ee7a 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -28,7 +28,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
/**
* Connector that connects operators in a Job.
@@ -91,10 +91,10 @@
*
* @param constraintAcceptor
* - Constraint Acceptor
- * @param plan
- * - Job Plan
+ * @param ac
+ * - Activity Cluster
*/
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
ICCApplicationContext appCtx);
/**
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index c37a530..43bf141 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
/**
* Descriptor for operators in Hyracks.
@@ -75,8 +74,7 @@
* @param plan
* - Job Plan
*/
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
- ICCApplicationContext appCtx);
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx);
/**
* Gets the display name.
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
new file mode 100644
index 0000000..6698ff7
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public final class ActivityCluster implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityClusterGraph acg;
+
+ private final ActivityClusterId id;
+
+ private final List<IActivity> roots;
+
+ private final Map<ActivityId, IActivity> activities;
+
+ private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectors;
+
+ private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap;
+
+ private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap;
+
+ private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap;
+
+ private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
+
+ private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
+
+ private final List<ActivityCluster> dependencies;
+
+ private IConnectorPolicyAssignmentPolicy cpap;
+
+ public ActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) {
+ this.acg = acg;
+ this.id = id;
+ roots = new ArrayList<IActivity>();
+ activities = new HashMap<ActivityId, IActivity>();
+ connectors = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+ connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
+ activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+ activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+ connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+ blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+ dependencies = new ArrayList<ActivityCluster>();
+ }
+
+ public ActivityClusterGraph getActivityClusterGraph() {
+ return acg;
+ }
+
+ public ActivityClusterId getId() {
+ return id;
+ }
+
+ public void addRoot(IActivity activity) {
+ roots.add(activity);
+ }
+
+ public void addActivity(IActivity activity) {
+ activities.put(activity.getActivityId(), activity);
+ }
+
+ public void addConnector(IConnectorDescriptor connector) {
+ connectors.put(connector.getConnectorId(), connector);
+ }
+
+ public void connect(IConnectorDescriptor connector, IActivity producerActivity, int producerPort,
+ IActivity consumerActivity, int consumerPort, RecordDescriptor recordDescriptor) {
+ if (!activities.containsKey(producerActivity.getActivityId())
+ || !activities.containsKey(consumerActivity.getActivityId())) {
+ throw new IllegalStateException("Connected Activities belong to different Activity Clusters: "
+ + producerActivity.getActivityId() + " and " + consumerActivity.getActivityId());
+ }
+ insertIntoIndexedMap(activityInputMap, consumerActivity.getActivityId(), consumerPort, connector);
+ insertIntoIndexedMap(activityOutputMap, producerActivity.getActivityId(), producerPort, connector);
+ connectorActivityMap.put(
+ connector.getConnectorId(),
+ Pair.<Pair<IActivity, Integer>, Pair<IActivity, Integer>> of(
+ Pair.<IActivity, Integer> of(producerActivity, producerPort),
+ Pair.<IActivity, Integer> of(consumerActivity, consumerPort)));
+ connectorRecordDescriptorMap.put(connector.getConnectorId(), recordDescriptor);
+ }
+
+ public List<IActivity> getRoots() {
+ return roots;
+ }
+
+ public Map<ActivityId, IActivity> getActivityMap() {
+ return activities;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+ return connectors;
+ }
+
+ public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() {
+ return connectorRecordDescriptorMap;
+ }
+
+ public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() {
+ return connectorActivityMap;
+ }
+
+ public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() {
+ return activityInputMap;
+ }
+
+ public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() {
+ return activityOutputMap;
+ }
+
+ public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+ return connEdge.getRight().getLeft().getActivityId();
+ }
+
+ public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+ return connEdge.getLeft().getLeft().getActivityId();
+ }
+
+ public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
+ return blocked2blockerMap;
+ }
+
+ public List<ActivityCluster> getDependencies() {
+ return dependencies;
+ }
+
+ public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+ return cpap;
+ }
+
+ public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy cpap) {
+ this.cpap = cpap;
+ }
+
+ private <T> void extend(List<T> list, int index) {
+ int n = list.size();
+ for (int i = n; i <= index; ++i) {
+ list.add(null);
+ }
+ }
+
+ private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+ List<V> vList = map.get(key);
+ if (vList == null) {
+ vList = new ArrayList<V>();
+ map.put(key, vList);
+ }
+ extend(vList, index);
+ vList.set(index, value);
+ }
+
+ public JSONObject toJSON() throws JSONException {
+ JSONObject jac = new JSONObject();
+
+ JSONArray jans = new JSONArray();
+ for (IActivity an : activities.values()) {
+ JSONObject jan = new JSONObject();
+ jan.put("id", an.getActivityId().toString());
+ jan.put("java-class", an.getClass().getName());
+
+ List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId());
+ if (inputs != null) {
+ JSONArray jInputs = new JSONArray();
+ for (int i = 0; i < inputs.size(); ++i) {
+ JSONObject jInput = new JSONObject();
+ jInput.put("input-port", i);
+ jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
+ jInputs.put(jInput);
+ }
+ jan.put("inputs", jInputs);
+ }
+
+ List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId());
+ if (outputs != null) {
+ JSONArray jOutputs = new JSONArray();
+ for (int i = 0; i < outputs.size(); ++i) {
+ JSONObject jOutput = new JSONObject();
+ jOutput.put("output-port", i);
+ jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
+ jOutputs.put(jOutput);
+ }
+ jan.put("outputs", jOutputs);
+ }
+
+ Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
+ if (blockers != null) {
+ JSONArray jDeps = new JSONArray();
+ for (ActivityId blocker : blockers) {
+ jDeps.put(blocker.toString());
+ }
+ jan.put("depends-on", jDeps);
+ }
+ jans.put(jan);
+ }
+ jac.put("activities", jans);
+
+ return jac;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
new file mode 100644
index 0000000..7285956
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -0,0 +1,117 @@
+/*
+2 * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+
+public class ActivityClusterGraph implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private int version;
+
+ private final Map<ActivityClusterId, ActivityCluster> activityClusterMap;
+
+ private final Map<ActivityId, ActivityCluster> activityMap;
+
+ private final Map<ConnectorDescriptorId, ActivityCluster> connectorMap;
+
+ private int maxReattempts;
+
+ private IJobletEventListenerFactory jobletEventListenerFactory;
+
+ private IGlobalJobDataFactory globalJobDataFactory;
+
+ public ActivityClusterGraph() {
+ version = 0;
+ activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
+ activityMap = new HashMap<ActivityId, ActivityCluster>();
+ connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
+ }
+
+ public Map<ActivityId, ActivityCluster> getActivityMap() {
+ return activityMap;
+ }
+
+ public Map<ConnectorDescriptorId, ActivityCluster> getConnectorMap() {
+ return connectorMap;
+ }
+
+ public Map<ActivityClusterId, ActivityCluster> getActivityClusterMap() {
+ return activityClusterMap;
+ }
+
+ public void addActivityClusters(Collection<ActivityCluster> newActivityClusters) {
+ for (ActivityCluster ac : newActivityClusters) {
+ activityClusterMap.put(ac.getId(), ac);
+ for (ActivityId aid : ac.getActivityMap().keySet()) {
+ activityMap.put(aid, ac);
+ }
+ for (ConnectorDescriptorId cid : ac.getConnectorMap().keySet()) {
+ connectorMap.put(cid, ac);
+ }
+ }
+ ++version;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setMaxReattempts(int maxReattempts) {
+ this.maxReattempts = maxReattempts;
+ }
+
+ public int getMaxReattempts() {
+ return maxReattempts;
+ }
+
+ public IJobletEventListenerFactory getJobletEventListenerFactory() {
+ return jobletEventListenerFactory;
+ }
+
+ public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+ this.jobletEventListenerFactory = jobletEventListenerFactory;
+ }
+
+ public IGlobalJobDataFactory getGlobalJobDataFactory() {
+ return globalJobDataFactory;
+ }
+
+ public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
+ this.globalJobDataFactory = globalJobDataFactory;
+ }
+
+ public JSONObject toJSON() throws JSONException {
+ JSONObject acgj = new JSONObject();
+
+ JSONArray acl = new JSONArray();
+ for (ActivityCluster ac : activityClusterMap.values()) {
+ acl.put(ac.toJSON());
+ }
+ acgj.put("version", version);
+ acgj.put("activity-clusters", acl);
+ return acgj;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
similarity index 65%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
index 46628a4..cba63959 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityClusterId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
@@ -12,19 +12,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.job;
import java.io.Serializable;
public final class ActivityClusterId implements Serializable {
private static final long serialVersionUID = 1L;
+ private final JobId jobId;
+
private final int id;
- public ActivityClusterId(int id) {
+ public ActivityClusterId(JobId jobId, int id) {
+ this.jobId = jobId;
this.id = id;
}
+ public JobId getJobId() {
+ return jobId;
+ }
+
public int getId() {
return id;
}
@@ -34,25 +41,37 @@
final int prime = 31;
int result = 1;
result = prime * result + id;
+ result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
- if (this == obj)
+ if (this == obj) {
return true;
- if (obj == null)
+ }
+ if (obj == null) {
return false;
- if (getClass() != obj.getClass())
+ }
+ if (getClass() != obj.getClass()) {
return false;
+ }
ActivityClusterId other = (ActivityClusterId) obj;
- if (id != other.id)
+ if (id != other.id) {
return false;
+ }
+ if (jobId == null) {
+ if (other.jobId != null) {
+ return false;
+ }
+ } else if (!jobId.equals(other.jobId)) {
+ return false;
+ }
return true;
}
@Override
public String toString() {
- return "AC:" + id;
+ return "ACID:" + jobId + ":" + id;
}
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
similarity index 68%
copy from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
index 840faef..b837066 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
@@ -12,11 +12,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.job;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import java.util.Set;
-public interface IOperatorDescriptorVisitor {
- public void visit(IOperatorDescriptor op) throws HyracksException;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+
+public interface IActivityClusterGraphGenerator {
+ public Set<Constraint> getConstraints();
+
+ public ActivityClusterGraph initialize();
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
similarity index 60%
copy from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 840faef..d801dd1 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -12,11 +12,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.job;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-public interface IOperatorDescriptorVisitor {
- public void visit(IOperatorDescriptor op) throws HyracksException;
+public interface IActivityClusterGraphGeneratorFactory extends Serializable {
+ public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(String appName, JobId jobId,
+ ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
index c0d2ef8..2f0f025 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
@@ -17,7 +17,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
public interface IJobLifecycleListener {
- public void notifyJobCreation(JobId jobId, IOperatorDescriptorRegistry jobSpec) throws HyracksException;
+ public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException;
public void notifyJobStart(JobId jobId) throws HyracksException;
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index 3c6a547..5b62b5c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -15,31 +15,22 @@
package edu.uci.ics.hyracks.api.job;
import java.io.Serializable;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
public class JobActivityGraph implements Serializable {
private static final long serialVersionUID = 1L;
- private final String appName;
-
- private final EnumSet<JobFlag> jobFlags;
-
private final Map<ActivityId, IActivity> activityMap;
private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectorMap;
@@ -52,39 +43,18 @@
private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
- private final Map<ActivityId, Set<ActivityId>> blocker2blockedMap;
-
private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
- private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
-
- private int maxReattempts;
-
- private IJobletEventListenerFactory jobletEventListenerFactory;
-
- private IGlobalJobDataFactory globalJobDataFactory;
-
- public JobActivityGraph(String appName, EnumSet<JobFlag> jobFlags) {
- this.appName = appName;
- this.jobFlags = jobFlags;
+ public JobActivityGraph() {
activityMap = new HashMap<ActivityId, IActivity>();
connectorMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
- blocker2blockedMap = new HashMap<ActivityId, Set<ActivityId>>();
blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
}
- public String getApplicationName() {
- return appName;
- }
-
- public EnumSet<JobFlag> getJobFlags() {
- return jobFlags;
- }
-
public Map<ActivityId, IActivity> getActivityMap() {
return activityMap;
}
@@ -97,10 +67,6 @@
return connectorRecordDescriptorMap;
}
- public Map<ActivityId, Set<ActivityId>> getBlocker2BlockedMap() {
- return blocker2blockedMap;
- }
-
public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
return blocked2blockerMap;
}
@@ -127,97 +93,13 @@
return connEdge.getLeft().getLeft().getActivityId();
}
- public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
- return connectorPolicyAssignmentPolicy;
- }
-
- public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
- this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
- }
-
- public void setMaxReattempts(int maxReattempts) {
- this.maxReattempts = maxReattempts;
- }
-
- public int getMaxReattempts() {
- return maxReattempts;
- }
-
- public IJobletEventListenerFactory getJobletEventListenerFactory() {
- return jobletEventListenerFactory;
- }
-
- public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
- this.jobletEventListenerFactory = jobletEventListenerFactory;
- }
-
- public IGlobalJobDataFactory getGlobalJobDataFactory() {
- return globalJobDataFactory;
- }
-
- public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
- this.globalJobDataFactory = globalJobDataFactory;
- }
-
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append("ActivityNodes: " + activityMap);
buffer.append('\n');
- buffer.append("Blocker->Blocked: " + blocker2blockedMap);
- buffer.append('\n');
- buffer.append("Blocked->Blocker: " + blocked2blockerMap);
+ buffer.append("Blocker->Blocked: " + blocked2blockerMap);
buffer.append('\n');
return buffer.toString();
}
-
- public JSONObject toJSON() throws JSONException {
- JSONObject jplan = new JSONObject();
-
- jplan.put("flags", jobFlags.toString());
-
- JSONArray jans = new JSONArray();
- for (IActivity an : activityMap.values()) {
- JSONObject jan = new JSONObject();
- jan.put("id", an.getActivityId().toString());
- jan.put("java-class", an.getClass().getName());
-
- List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId());
- if (inputs != null) {
- JSONArray jInputs = new JSONArray();
- for (int i = 0; i < inputs.size(); ++i) {
- JSONObject jInput = new JSONObject();
- jInput.put("input-port", i);
- jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
- jInputs.put(jInput);
- }
- jan.put("inputs", jInputs);
- }
-
- List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId());
- if (outputs != null) {
- JSONArray jOutputs = new JSONArray();
- for (int i = 0; i < outputs.size(); ++i) {
- JSONObject jOutput = new JSONObject();
- jOutput.put("output-port", i);
- jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
- jOutputs.put(jOutput);
- }
- jan.put("outputs", jOutputs);
- }
-
- Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
- if (blockers != null) {
- JSONArray jDeps = new JSONArray();
- for (ActivityId blocker : blockers) {
- jDeps.put(blocker.toString());
- }
- jan.put("depends-on", jDeps);
- }
- jans.put(jan);
- }
- jplan.put("activities", jans);
-
- return jplan;
- }
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java
new file mode 100644
index 0000000..6f566d1
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class ClusterTopology implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final NetworkSwitch clusterSwitch;
+
+ public ClusterTopology(NetworkSwitch clusterSwitch) {
+ this.clusterSwitch = clusterSwitch;
+ }
+
+ public NetworkSwitch getClusterSwitch() {
+ return clusterSwitch;
+ }
+
+ public boolean lookupNetworkTerminal(String terminalName, List<Integer> path) {
+ return clusterSwitch.lookupNetworkTerminal(terminalName, path);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java
new file mode 100644
index 0000000..0aaee2a
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class NetworkEndpoint implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public enum EndpointType {
+ NETWORK_SWITCH,
+ NETWORK_TERMINAL,
+ }
+
+ protected final String name;
+
+ protected final Map<String, String> properties;
+
+ public NetworkEndpoint(String name, Map<String, String> properties) {
+ this.name = name;
+ this.properties = properties;
+ }
+
+ public abstract EndpointType getType();
+
+ public final String getName() {
+ return name;
+ }
+
+ public final Map<String, String> getProperties() {
+ return properties;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java
new file mode 100644
index 0000000..3c89044
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NetworkSwitch extends NetworkEndpoint {
+ private static final long serialVersionUID = 1L;
+
+ private final Port[] ports;
+
+ private final Map<String, Integer> terminalNamePortIndexMap;
+
+ public NetworkSwitch(String name, Map<String, String> properties, Port[] ports) {
+ super(name, properties);
+ this.ports = ports;
+ terminalNamePortIndexMap = new HashMap<String, Integer>();
+ for (int i = 0; i < ports.length; ++i) {
+ Port port = ports[i];
+ NetworkEndpoint endpoint = port.getEndpoint();
+ Integer portIndex = Integer.valueOf(i);
+ switch (endpoint.getType()) {
+ case NETWORK_SWITCH: {
+ NetworkSwitch s = (NetworkSwitch) endpoint;
+ for (String t : s.terminalNamePortIndexMap.keySet()) {
+ terminalNamePortIndexMap.put(t, portIndex);
+ }
+ break;
+ }
+
+ case NETWORK_TERMINAL: {
+ NetworkTerminal t = (NetworkTerminal) endpoint;
+ terminalNamePortIndexMap.put(t.getName(), portIndex);
+ break;
+ }
+ }
+ }
+ }
+
+ public Port[] getPorts() {
+ return ports;
+ }
+
+ @Override
+ public EndpointType getType() {
+ return EndpointType.NETWORK_SWITCH;
+ }
+
+ boolean lookupNetworkTerminal(String terminalName, List<Integer> path) {
+ if (terminalNamePortIndexMap.containsKey(terminalName)) {
+ Integer portIndex = terminalNamePortIndexMap.get(terminalName);
+ path.add(portIndex);
+ NetworkEndpoint endpoint = ports[portIndex.intValue()].getEndpoint();
+ if (endpoint.getType() == EndpointType.NETWORK_SWITCH) {
+ ((NetworkSwitch) endpoint).lookupNetworkTerminal(terminalName, path);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ void getPortList(List<Integer> path, int stepIndex, List<Port> portList) {
+ if (stepIndex >= path.size()) {
+ return;
+ }
+ int portIndex = path.get(stepIndex);
+ Port port = ports[portIndex];
+ portList.add(port);
+ ++stepIndex;
+ if (stepIndex >= path.size()) {
+ return;
+ }
+ NetworkEndpoint endpoint = port.getEndpoint();
+ if (endpoint.getType() != EndpointType.NETWORK_SWITCH) {
+ throw new IllegalArgumentException("Path provided, " + path + ", longer than depth of topology tree");
+ }
+ ((NetworkSwitch) endpoint).getPortList(path, stepIndex, portList);
+ }
+
+ public static class Port implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final NetworkEndpoint endpoint;
+
+ public Port(NetworkEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public NetworkEndpoint getEndpoint() {
+ return endpoint;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
similarity index 61%
copy from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
index 840faef..f36da76 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/IOperatorDescriptorVisitor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
@@ -12,11 +12,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.cc.job;
+package edu.uci.ics.hyracks.api.topology;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import java.util.Map;
-public interface IOperatorDescriptorVisitor {
- public void visit(IOperatorDescriptor op) throws HyracksException;
+public class NetworkTerminal extends NetworkEndpoint {
+ private static final long serialVersionUID = 1L;
+
+ public NetworkTerminal(String name, Map<String, String> properties) {
+ super(name, properties);
+ }
+
+ @Override
+ public EndpointType getType() {
+ return EndpointType.NETWORK_TERMINAL;
+ }
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java
new file mode 100644
index 0000000..0ca6127
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.xml.sax.Attributes;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+import edu.uci.ics.hyracks.api.topology.NetworkEndpoint.EndpointType;
+
+public class TopologyDefinitionParser {
+ private final Stack<ElementStackEntry> stack;
+
+ private boolean inPropertyElement;
+
+ private TopologyDefinitionParser() {
+ stack = new Stack<ElementStackEntry>();
+ inPropertyElement = false;
+ }
+
+ public static ClusterTopology parse(InputSource in) throws IOException, SAXException {
+ TopologyDefinitionParser parser = new TopologyDefinitionParser();
+ return parser.parseInternal(in);
+ }
+
+ private ClusterTopology parseInternal(InputSource in) throws IOException, SAXException {
+ XMLReader parser;
+ parser = XMLReaderFactory.createXMLReader();
+ SAXContentHandler handler = new SAXContentHandler();
+ parser.setContentHandler(handler);
+ parser.parse(in);
+ if (stack.size() != 1) {
+ throw new IllegalStateException("Malformed topology definition");
+ }
+ ElementStackEntry e = stack.pop();
+ if (e.ports.size() != 1) {
+ throw new IllegalArgumentException("Malformed topology definition");
+ }
+ NetworkEndpoint endpoint = e.ports.get(0).getEndpoint();
+ if (endpoint.getType() != EndpointType.NETWORK_SWITCH) {
+ throw new IllegalArgumentException("Top level content in cluster-topology must be network-switch");
+ }
+ return new ClusterTopology((NetworkSwitch) endpoint);
+ }
+
+ private class SAXContentHandler extends DefaultHandler {
+ @Override
+ public void endElement(String uri, String localName, String qName) throws SAXException {
+ if ("network-switch".equals(localName) || "terminal".equals(localName)) {
+ ElementStackEntry e = stack.pop();
+ NetworkEndpoint endpoint = e.type == EndpointType.NETWORK_SWITCH ? new NetworkSwitch(e.name,
+ e.properties, e.ports.toArray(new NetworkSwitch.Port[e.ports.size()])) : new NetworkTerminal(
+ e.name, e.properties);
+ stack.peek().ports.add(new NetworkSwitch.Port(endpoint));
+ } else if ("property".equals(localName)) {
+ if (!inPropertyElement) {
+ throw new IllegalStateException("Improperly nested property element encountered");
+ }
+ inPropertyElement = false;
+ }
+ }
+
+ @Override
+ public void startElement(String uri, String localName, String qName, Attributes atts) throws SAXException {
+ if ("cluster-topology".equals(localName)) {
+ if (!stack.isEmpty()) {
+ throw new IllegalStateException("Encountered unexpected " + qName);
+ }
+ stack.push(new ElementStackEntry(null, ""));
+ } else if ("network-switch".equals(localName) || "terminal".equals(localName)) {
+ String name = atts.getValue("", "name");
+ if (name == null) {
+ throw new IllegalStateException("Encountered " + localName + " element with no name attribute");
+ }
+ EndpointType type = "network-switch".equals(localName) ? EndpointType.NETWORK_SWITCH
+ : EndpointType.NETWORK_TERMINAL;
+ ElementStackEntry e = new ElementStackEntry(type, name);
+ stack.push(e);
+ } else if ("property".equals(localName)) {
+ if (inPropertyElement) {
+ throw new IllegalStateException("Improperly nested property element encountered");
+ }
+ String name = atts.getValue("", "name");
+ if (name == null) {
+ throw new IllegalStateException("Encountered " + localName + " element with no name attribute");
+ }
+ String value = atts.getValue("", "value");
+ if (value == null) {
+ throw new IllegalStateException("Encountered " + localName + " element with no value attribute");
+ }
+ stack.peek().properties.put(name, value);
+ inPropertyElement = true;
+ }
+ }
+ }
+
+ private static class ElementStackEntry {
+ private final EndpointType type;
+
+ private final String name;
+
+ private final Map<String, String> properties;
+
+ private final List<NetworkSwitch.Port> ports;
+
+ public ElementStackEntry(EndpointType type, String name) {
+ this.type = type;
+ this.name = name;
+ this.properties = new HashMap<String, String>();
+ ports = new ArrayList<NetworkSwitch.Port>();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index a9a69fa..32b031d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.control.cc;
import java.io.File;
+import java.io.FileReader;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Hashtable;
@@ -29,12 +30,16 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.xml.sax.InputSource;
+
import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.web.WebServer;
@@ -137,6 +142,7 @@
};
workQueue = new WorkQueue();
this.timer = new Timer(true);
+ final ClusterTopology topology = computeClusterTopology(ccConfig);
ccContext = new ICCContext() {
@Override
public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception {
@@ -148,11 +154,29 @@
public ClusterControllerInfo getClusterControllerInfo() {
return info;
}
+
+ @Override
+ public ClusterTopology getClusterTopology() {
+ return topology;
+ }
};
sweeper = new DeadNodeSweeper();
jobCounter = 0;
}
+ private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
+ if (ccConfig.clusterTopologyDefinition == null) {
+ return null;
+ }
+ FileReader fr = new FileReader(ccConfig.clusterTopologyDefinition);
+ InputSource in = new InputSource(fr);
+ try {
+ return TopologyDefinitionParser.parse(in);
+ } finally {
+ fr.close();
+ }
+ }
+
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
@@ -292,7 +316,7 @@
HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = createJobId();
workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getAppName(), sjf
- .getJobSpec(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
+ .getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid)));
return;
}
@@ -308,6 +332,15 @@
new IPCResponder<Map<String, NodeControllerInfo>>(handle, mid)));
return;
}
+
+ case GET_CLUSTER_TOPOLOGY: {
+ try {
+ handle.send(mid, ccContext.getClusterTopology(), null);
+ } catch (IPCException e) {
+ e.printStackTrace();
+ }
+ return;
+ }
}
try {
handle.send(mid, null, new IllegalArgumentException("Unknown function " + fn.getFunctionId()));
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
index a486c33..e165415 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
@@ -33,7 +33,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork;
+import edu.uci.ics.hyracks.control.cc.work.GetActivityClusterGraphJSONWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork;
public class JobDetailsPage extends AbstractPage {
@@ -48,13 +48,13 @@
JobId jobId = JobId.parse(jobIdStr.toString());
- GetJobActivityGraphJSONWork gjagw = new GetJobActivityGraphJSONWork(ccs, jobId);
- ccs.getWorkQueue().scheduleAndSync(gjagw);
- Label jag = new Label("job-activity-graph", gjagw.getJSON().toString());
+ GetActivityClusterGraphJSONWork gacgw = new GetActivityClusterGraphJSONWork(ccs, jobId);
+ ccs.getWorkQueue().scheduleAndSync(gacgw);
+ Label jag = new Label("activity-cluster-graph", gacgw.getJSON().toString());
jag.setEscapeModelStrings(false);
add(jag);
- JSONObject jagO = gjagw.getJSON();
+ JSONObject jagO = gacgw.getJSON();
Map<ActivityId, String> activityMap = new HashMap<ActivityId, String>();
if (jagO.has("activities")) {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 325ea84..24dfa7c 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -25,10 +25,9 @@
import edu.uci.ics.hyracks.api.application.ICCBootstrap;
import edu.uci.ics.hyracks.api.context.ICCContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
-import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -63,9 +62,10 @@
return ccContext;
}
- public JobSpecification createJobSpecification(byte[] bytes) throws HyracksException {
+ public IActivityClusterGraphGeneratorFactory createActivityClusterGraphGeneratorFactory(byte[] bytes)
+ throws HyracksException {
try {
- return (JobSpecification) JavaSerializationUtils.deserialize(bytes, getClassLoader());
+ return (IActivityClusterGraphGeneratorFactory) JavaSerializationUtils.deserialize(bytes, getClassLoader());
} catch (IOException e) {
throw new HyracksException(e);
} catch (ClassNotFoundException e) {
@@ -102,10 +102,10 @@
}
}
- public synchronized void notifyJobCreation(JobId jobId, IOperatorDescriptorRegistry specification)
+ public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf)
throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
- l.notifyJobCreation(jobId, specification);
+ l.notifyJobCreation(jobId, acggf);
}
}
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
deleted file mode 100644
index d351d4a..0000000
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.job;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-
-public class ActivityCluster {
- private final Set<ActivityId> activities;
-
- private final Set<ActivityCluster> dependencies;
-
- private final Set<ActivityCluster> dependents;
-
- private ActivityClusterId id;
-
- private ActivityClusterPlan acp;
-
- public ActivityCluster(Set<ActivityId> activities) {
- this.activities = activities;
- dependencies = new HashSet<ActivityCluster>();
- dependents = new HashSet<ActivityCluster>();
- }
-
- public Set<ActivityId> getActivities() {
- return activities;
- }
-
- public void addDependency(ActivityCluster stage) {
- dependencies.add(stage);
- }
-
- public void addDependent(ActivityCluster stage) {
- dependents.add(stage);
- }
-
- public Set<ActivityCluster> getDependencies() {
- return dependencies;
- }
-
- public Set<ActivityCluster> getDependents() {
- return dependents;
- }
-
- public ActivityClusterId getActivityClusterId() {
- return id;
- }
-
- public void setActivityClusterId(ActivityClusterId id) {
- this.id = id;
- }
-
- public ActivityClusterPlan getPlan() {
- return acp;
- }
-
- public void setPlan(ActivityClusterPlan acp) {
- this.acp = acp;
- }
-
- @Override
- public String toString() {
- return String.valueOf(activities);
- }
-}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index 707ef1f..6f26de2 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.cc.job;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,10 +30,15 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
+import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.partitions.PartitionMatchMaker;
import edu.uci.ics.hyracks.control.cc.scheduler.ActivityPartitionDetails;
import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
@@ -41,7 +47,17 @@
public class JobRun implements IJobStatusConditionVariable {
private final JobId jobId;
- private final JobActivityGraph jag;
+ private final String applicationName;
+
+ private final IActivityClusterGraphGenerator acgg;
+
+ private final ActivityClusterGraph acg;
+
+ private final JobScheduler scheduler;
+
+ private final EnumSet<JobFlag> jobFlags;
+
+ private final Map<ActivityClusterId, ActivityClusterPlan> activityClusterPlanMap;
private final PartitionMatchMaker pmm;
@@ -51,14 +67,8 @@
private final JobProfile profile;
- private Set<ActivityCluster> activityClusters;
-
- private final Map<ActivityId, ActivityCluster> activityClusterMap;
-
private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap;
- private JobScheduler js;
-
private long createTime;
private long startTime;
@@ -73,14 +83,19 @@
private Exception pendingException;
- public JobRun(JobId jobId, JobActivityGraph plan) {
+ public JobRun(ClusterControllerService ccs, JobId jobId, String applicationName,
+ IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
this.jobId = jobId;
- this.jag = plan;
+ this.applicationName = applicationName;
+ this.acgg = acgg;
+ this.acg = acgg.initialize();
+ this.scheduler = new JobScheduler(ccs, this, acgg.getConstraints());
+ this.jobFlags = jobFlags;
+ activityClusterPlanMap = new HashMap<ActivityClusterId, ActivityClusterPlan>();
pmm = new PartitionMatchMaker();
participatingNodeIds = new HashSet<String>();
cleanupPendingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
- activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
}
@@ -88,8 +103,20 @@
return jobId;
}
- public JobActivityGraph getJobActivityGraph() {
- return jag;
+ public String getApplicationName() {
+ return applicationName;
+ }
+
+ public ActivityClusterGraph getActivityClusterGraph() {
+ return acg;
+ }
+
+ public EnumSet<JobFlag> getFlags() {
+ return jobFlags;
+ }
+
+ public Map<ActivityClusterId, ActivityClusterPlan> getActivityClusterPlanMap() {
+ return activityClusterPlanMap;
}
public PartitionMatchMaker getPartitionMatchMaker() {
@@ -169,24 +196,8 @@
return profile;
}
- public void setScheduler(JobScheduler js) {
- this.js = js;
- }
-
public JobScheduler getScheduler() {
- return js;
- }
-
- public Map<ActivityId, ActivityCluster> getActivityClusterMap() {
- return activityClusterMap;
- }
-
- public Set<ActivityCluster> getActivityClusters() {
- return activityClusters;
- }
-
- public void setActivityClusters(Set<ActivityCluster> activityClusters) {
- this.activityClusters = activityClusters;
+ return scheduler;
}
public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
@@ -197,37 +208,31 @@
JSONObject result = new JSONObject();
result.put("job-id", jobId.toString());
- result.put("application-name", jag.getApplicationName());
+ result.put("application-name", applicationName);
result.put("status", getStatus());
result.put("create-time", getCreateTime());
result.put("start-time", getCreateTime());
result.put("end-time", getCreateTime());
JSONArray aClusters = new JSONArray();
- for (ActivityCluster ac : activityClusters) {
+ for (ActivityCluster ac : acg.getActivityClusterMap().values()) {
JSONObject acJSON = new JSONObject();
- acJSON.put("activity-cluster-id", String.valueOf(ac.getActivityClusterId()));
+ acJSON.put("activity-cluster-id", String.valueOf(ac.getId()));
JSONArray activitiesJSON = new JSONArray();
- for (ActivityId aid : ac.getActivities()) {
+ for (ActivityId aid : ac.getActivityMap().keySet()) {
activitiesJSON.put(aid);
}
acJSON.put("activities", activitiesJSON);
- JSONArray dependentsJSON = new JSONArray();
- for (ActivityCluster dependent : ac.getDependents()) {
- dependentsJSON.put(String.valueOf(dependent.getActivityClusterId()));
- }
- acJSON.put("dependents", dependentsJSON);
-
JSONArray dependenciesJSON = new JSONArray();
for (ActivityCluster dependency : ac.getDependencies()) {
- dependenciesJSON.put(String.valueOf(dependency.getActivityClusterId()));
+ dependenciesJSON.put(String.valueOf(dependency.getId()));
}
acJSON.put("dependencies", dependenciesJSON);
- ActivityClusterPlan acp = ac.getPlan();
+ ActivityClusterPlan acp = activityClusterPlanMap.get(ac.getId());
if (acp == null) {
acJSON.put("plan", (Object) null);
} else {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index f41cda9..17beb2d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Set;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class TaskCluster {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
index b1000dd..1218c14 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskClusterId.java
@@ -16,6 +16,8 @@
import java.io.Serializable;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+
public final class TaskClusterId implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
deleted file mode 100644
index 9acc4b8..0000000
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.scheduler;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.ActivityClusterId;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-
-public class ActivityClusterGraphBuilder {
- private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
-
- private final JobRun jobRun;
-
- public ActivityClusterGraphBuilder(JobRun jobRun) {
- this.jobRun = jobRun;
- }
-
- private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, Set<ActivityCluster> eqSets) {
- for (ActivityCluster eqSet : eqSets) {
- for (ActivityId t : eqSet.getActivities()) {
- List<IConnectorDescriptor> inputList = jag.getActivityInputMap().get(t);
- if (inputList != null) {
- for (IConnectorDescriptor conn : inputList) {
- ActivityId inTask = jag.getProducerActivity(conn.getConnectorId());
- if (!eqSet.getActivities().contains(inTask)) {
- return Pair.<ActivityId, ActivityId> of(t, inTask);
- }
- }
- }
- List<IConnectorDescriptor> outputList = jag.getActivityOutputMap().get(t);
- if (outputList != null) {
- for (IConnectorDescriptor conn : outputList) {
- ActivityId outTask = jag.getConsumerActivity(conn.getConnectorId());
- if (!eqSet.getActivities().contains(outTask)) {
- return Pair.<ActivityId, ActivityId> of(t, outTask);
- }
- }
- }
- }
- }
- return null;
- }
-
- public Set<ActivityCluster> inferActivityClusters(JobActivityGraph jag) {
- /*
- * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
- */
- Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
- Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
- for (ActivityId taskId : jag.getActivityMap().keySet()) {
- Set<ActivityId> eqSet = new HashSet<ActivityId>();
- eqSet.add(taskId);
- ActivityCluster stage = new ActivityCluster(eqSet);
- stageMap.put(taskId, stage);
- stages.add(stage);
- }
-
- boolean changed = true;
- while (changed) {
- changed = false;
- Pair<ActivityId, ActivityId> pair = findMergePair(jag, stages);
- if (pair != null) {
- merge(stageMap, stages, pair.getLeft(), pair.getRight());
- changed = true;
- }
- }
-
- Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
- for (ActivityCluster s : stages) {
- Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
- for (ActivityId t : s.getActivities()) {
- Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
- if (blockedTasks != null) {
- for (ActivityId bt : blockedTasks) {
- blockedStages.add(stageMap.get(bt));
- }
- }
- }
- for (ActivityCluster bs : blockedStages) {
- bs.addDependency(s);
- s.addDependent(bs);
- }
- }
- Set<ActivityCluster> roots = new HashSet<ActivityCluster>();
- int idCounter = 0;
- for (ActivityCluster s : stages) {
- s.setActivityClusterId(new ActivityClusterId(idCounter++));
- if (s.getDependents().isEmpty()) {
- roots.add(s);
- }
- }
- jobRun.setActivityClusters(stages);
- jobRun.getActivityClusterMap().putAll(stageMap);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Inferred " + stages.size() + " stages");
- for (ActivityCluster s : stages) {
- LOGGER.info(s.toString());
- }
- }
- return roots;
- }
-
- private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
- ActivityId t2) {
- ActivityCluster stage1 = eqSetMap.get(t1);
- Set<ActivityId> s1 = stage1.getActivities();
- ActivityCluster stage2 = eqSetMap.get(t2);
- Set<ActivityId> s2 = stage2.getActivities();
-
- Set<ActivityId> mergedSet = new HashSet<ActivityId>();
- mergedSet.addAll(s1);
- mergedSet.addAll(s2);
-
- eqSets.remove(stage1);
- eqSets.remove(stage2);
- ActivityCluster mergedStage = new ActivityCluster(mergedSet);
- eqSets.add(mergedStage);
-
- for (ActivityId t : mergedSet) {
- eqSetMap.put(t, mergedStage);
- }
- }
-}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index 23f53f4..6c52aac 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -38,9 +38,9 @@
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
import edu.uci.ics.hyracks.control.cc.job.ActivityPlan;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -62,7 +62,7 @@
partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
}
- public void planActivityCluster(ActivityCluster ac) throws HyracksException {
+ public ActivityClusterPlan planActivityCluster(ActivityCluster ac) throws HyracksException {
JobRun jobRun = scheduler.getJobRun();
Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
@@ -80,16 +80,16 @@
}
}
- ac.setPlan(new ActivityClusterPlan(taskClusters, activityPlanMap));
+ return new ActivityClusterPlan(taskClusters, activityPlanMap);
}
private Map<ActivityId, ActivityPlan> buildActivityPlanMap(ActivityCluster ac, JobRun jobRun,
Map<ActivityId, ActivityPartitionDetails> pcMap) {
Map<ActivityId, ActivityPlan> activityPlanMap = new HashMap<ActivityId, ActivityPlan>();
Set<ActivityId> depAnIds = new HashSet<ActivityId>();
- for (ActivityId anId : ac.getActivities()) {
+ for (ActivityId anId : ac.getActivityMap().keySet()) {
depAnIds.clear();
- getDependencyActivityIds(depAnIds, anId);
+ getDependencyActivityIds(depAnIds, anId, ac);
ActivityPartitionDetails apd = pcMap.get(anId);
Task[] tasks = new Task[apd.getPartitionCount()];
ActivityPlan activityPlan = new ActivityPlan(apd);
@@ -97,8 +97,8 @@
TaskId tid = new TaskId(anId, i);
tasks[i] = new Task(tid, activityPlan);
for (ActivityId danId : depAnIds) {
- ActivityCluster dAC = jobRun.getActivityClusterMap().get(danId);
- ActivityClusterPlan dACP = dAC.getPlan();
+ ActivityCluster dAC = ac.getActivityClusterGraph().getActivityMap().get(danId);
+ ActivityClusterPlan dACP = jobRun.getActivityClusterPlanMap().get(dAC.getId());
assert dACP != null : "IllegalStateEncountered: Dependent AC is being planned without a plan for dependency AC: Encountered no plan for ActivityID "
+ danId;
Task[] dATasks = dACP.getActivityPlanMap().get(danId).getTasks();
@@ -120,7 +120,7 @@
private TaskCluster[] computeTaskClusters(ActivityCluster ac, JobRun jobRun,
Map<ActivityId, ActivityPlan> activityPlanMap) {
- Set<ActivityId> activities = ac.getActivities();
+ Set<ActivityId> activities = ac.getActivityMap().keySet();
Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = computeTaskConnectivity(jobRun,
activityPlanMap, activities);
@@ -161,15 +161,15 @@
private TaskCluster[] buildConnectorPolicyUnawareTaskClusters(ActivityCluster ac,
Map<ActivityId, ActivityPlan> activityPlanMap) {
List<Task> taskStates = new ArrayList<Task>();
- for (ActivityId anId : ac.getActivities()) {
+ for (ActivityId anId : ac.getActivityMap().keySet()) {
ActivityPlan ap = activityPlanMap.get(anId);
Task[] tasks = ap.getTasks();
for (Task t : tasks) {
taskStates.add(t);
}
}
- TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), 0), ac,
- taskStates.toArray(new Task[taskStates.size()]));
+ TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), 0), ac, taskStates.toArray(new Task[taskStates
+ .size()]));
for (Task t : tc.getTasks()) {
t.setTaskCluster(tc);
}
@@ -179,16 +179,17 @@
private Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> computeTaskConnectivity(JobRun jobRun,
Map<ActivityId, ActivityPlan> activityPlanMap, Set<ActivityId> activities) {
Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
- JobActivityGraph jag = jobRun.getJobActivityGraph();
+ ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
BitSet targetBitmap = new BitSet();
for (ActivityId ac1 : activities) {
+ ActivityCluster ac = acg.getActivityMap().get(ac1);
Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = jag.getActivityOutputMap().get(ac1);
+ List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(ac1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityId ac2 = jag.getConsumerActivity(cdId);
+ ActivityId ac2 = ac.getConsumerActivity(cdId);
Task[] ac2TaskStates = activityPlanMap.get(ac2).getTasks();
int nConsumers = ac2TaskStates.length;
for (int i = 0; i < nProducers; ++i) {
@@ -214,7 +215,7 @@
Map<ActivityId, ActivityPlan> activityPlanMap,
Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity) {
Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
- for (ActivityId anId : ac.getActivities()) {
+ for (ActivityId anId : ac.getActivityMap().keySet()) {
ActivityPlan ap = activityPlanMap.get(anId);
Task[] tasks = ap.getTasks();
for (Task t : tasks) {
@@ -298,7 +299,7 @@
for (TaskId tid : cluster) {
taskStates.add(activityPlanMap.get(tid.getActivityId()).getTasks()[tid.getPartition()]);
}
- TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getActivityClusterId(), counter++), ac,
+ TaskCluster tc = new TaskCluster(new TaskClusterId(ac.getId(), counter++), ac,
taskStates.toArray(new Task[taskStates.size()]));
tcSet.add(tc);
for (TaskId tid : cluster) {
@@ -310,35 +311,34 @@
}
private TaskCluster getTaskCluster(TaskId tid) {
- ActivityCluster ac = scheduler.getJobRun().getActivityClusterMap().get(tid.getActivityId());
- ActivityClusterPlan acp = ac.getPlan();
+ JobRun run = scheduler.getJobRun();
+ ActivityCluster ac = run.getActivityClusterGraph().getActivityMap().get(tid.getActivityId());
+ ActivityClusterPlan acp = run.getActivityClusterPlanMap().get(ac.getId());
Task[] tasks = acp.getActivityPlanMap().get(tid.getActivityId()).getTasks();
Task task = tasks[tid.getPartition()];
assert task.getTaskId().equals(tid);
return task.getTaskCluster();
}
- private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId) {
- JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
- Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(anId);
+ private void getDependencyActivityIds(Set<ActivityId> depAnIds, ActivityId anId, ActivityCluster ac) {
+ Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(anId);
if (blockers != null) {
depAnIds.addAll(blockers);
}
}
private void assignConnectorPolicy(ActivityCluster ac, Map<ActivityId, ActivityPlan> taskMap) {
- JobActivityGraph jag = scheduler.getJobRun().getJobActivityGraph();
Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
- Set<ActivityId> activities = ac.getActivities();
+ Set<ActivityId> activities = ac.getActivityMap().keySet();
BitSet targetBitmap = new BitSet();
for (ActivityId a1 : activities) {
Task[] ac1TaskStates = taskMap.get(a1).getTasks();
int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = jag.getActivityOutputMap().get(a1);
+ List<IConnectorDescriptor> outputConns = ac.getActivityOutputMap().get(a1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityId a2 = jag.getConsumerActivity(cdId);
+ ActivityId a2 = ac.getConsumerActivity(cdId);
Task[] ac2TaskStates = taskMap.get(a2).getTasks();
int nConsumers = ac2TaskStates.length;
@@ -347,7 +347,7 @@
c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
fanouts[i] = targetBitmap.cardinality();
}
- IConnectorPolicy cp = assignConnectorPolicy(c, nProducers, nConsumers, fanouts);
+ IConnectorPolicy cp = assignConnectorPolicy(ac, c, nProducers, nConsumers, fanouts);
cPolicyMap.put(cdId, cp);
}
}
@@ -355,9 +355,9 @@
scheduler.getJobRun().getConnectorPolicyMap().putAll(cPolicyMap);
}
- private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
- IConnectorPolicyAssignmentPolicy cpap = scheduler.getJobRun().getJobActivityGraph()
- .getConnectorPolicyAssignmentPolicy();
+ private IConnectorPolicy assignConnectorPolicy(ActivityCluster ac, IConnectorDescriptor c, int nProducers,
+ int nConsumers, int[] fanouts) {
+ IConnectorPolicyAssignmentPolicy cpap = ac.getConnectorPolicyAssignmentPolicy();
if (cpap != null) {
return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
}
@@ -367,10 +367,8 @@
private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
throws HyracksException {
PartitionConstraintSolver solver = scheduler.getSolver();
- JobRun jobRun = scheduler.getJobRun();
- JobActivityGraph jag = jobRun.getJobActivityGraph();
Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
- for (ActivityId anId : ac.getActivities()) {
+ for (ActivityId anId : ac.getActivityMap().keySet()) {
lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
}
solver.solve(lValues);
@@ -391,26 +389,26 @@
nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
}
Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
- for (ActivityId anId : ac.getActivities()) {
+ for (ActivityId anId : ac.getActivityMap().keySet()) {
int nParts = nPartMap.get(anId.getOperatorDescriptorId());
int[] nInputPartitions = null;
- List<IConnectorDescriptor> inputs = jag.getActivityInputMap().get(anId);
+ List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(anId);
if (inputs != null) {
nInputPartitions = new int[inputs.size()];
for (int i = 0; i < nInputPartitions.length; ++i) {
ConnectorDescriptorId cdId = inputs.get(i).getConnectorId();
- ActivityId aid = jag.getProducerActivity(cdId);
+ ActivityId aid = ac.getProducerActivity(cdId);
Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
nInputPartitions[i] = nPartInt;
}
}
int[] nOutputPartitions = null;
- List<IConnectorDescriptor> outputs = jag.getActivityOutputMap().get(anId);
+ List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(anId);
if (outputs != null) {
nOutputPartitions = new int[outputs.size()];
for (int i = 0; i < nOutputPartitions.length; ++i) {
ConnectorDescriptorId cdId = outputs.get(i).getConnectorId();
- ActivityId aid = jag.getConsumerActivity(cdId);
+ ActivityId aid = ac.getConsumerActivity(cdId);
Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
nOutputPartitions[i] = nPartInt;
}
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index b3a76ad..b163db5 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -35,14 +35,15 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
+import edu.uci.ics.hyracks.control.cc.job.ActivityClusterPlan;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.Task;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
@@ -66,8 +67,6 @@
private final Set<TaskCluster> inProgressTaskClusters;
- private Set<ActivityCluster> rootActivityClusters;
-
public JobScheduler(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) {
this.ccs = ccs;
this.jobRun = jobRun;
@@ -75,8 +74,6 @@
partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
inProgressTaskClusters = new HashSet<TaskCluster>();
solver.addConstraints(constraints);
- ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
- rootActivityClusters = acgb.inferActivityClusters(jobRun.getJobActivityGraph());
}
public JobRun getJobRun() {
@@ -91,7 +88,7 @@
startRunnableActivityClusters();
}
- private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Set<ActivityCluster> roots)
+ private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
throws HyracksException {
for (ActivityCluster root : roots) {
findRunnableTaskClusterRoots(frontier, root);
@@ -107,14 +104,13 @@
findRunnableTaskClusterRoots(frontier, depAC);
} else {
boolean tcRootsComplete = true;
- Set<TaskCluster> depACTCRoots = new HashSet<TaskCluster>();
- for (TaskCluster tc : depAC.getPlan().getTaskClusters()) {
+ for (TaskCluster tc : getActivityClusterPlan(depAC).getTaskClusters()) {
if (tc.getProducedPartitions().isEmpty()) {
TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
tcRootsComplete = false;
+ break;
}
- depACTCRoots.add(tc);
}
}
if (!tcRootsComplete) {
@@ -126,10 +122,11 @@
if (depsComplete) {
if (!isPlanned(candidate)) {
ActivityClusterPlanner acp = new ActivityClusterPlanner(this);
- acp.planActivityCluster(candidate);
+ ActivityClusterPlan acPlan = acp.planActivityCluster(candidate);
+ jobRun.getActivityClusterPlanMap().put(candidate.getId(), acPlan);
partitionProducingTaskClusterMap.putAll(acp.getPartitionProducingTaskClusterMap());
}
- for (TaskCluster tc : candidate.getPlan().getTaskClusters()) {
+ for (TaskCluster tc : getActivityClusterPlan(candidate).getTaskClusters()) {
if (tc.getProducedPartitions().isEmpty()) {
TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
if (tca == null || tca.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
@@ -140,13 +137,18 @@
}
}
+ private ActivityClusterPlan getActivityClusterPlan(ActivityCluster ac) {
+ return jobRun.getActivityClusterPlanMap().get(ac.getId());
+ }
+
private boolean isPlanned(ActivityCluster ac) {
- return ac.getPlan() != null;
+ return jobRun.getActivityClusterPlanMap().get(ac.getId()) != null;
}
private void startRunnableActivityClusters() throws HyracksException {
Set<TaskCluster> taskClusterRoots = new HashSet<TaskCluster>();
- findRunnableTaskClusterRoots(taskClusterRoots, rootActivityClusters);
+ findRunnableTaskClusterRoots(taskClusterRoots, jobRun.getActivityClusterGraph().getActivityClusterMap()
+ .values());
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ", inProgressTaskClusters: "
+ inProgressTaskClusters);
@@ -286,7 +288,7 @@
private void assignTaskLocations(TaskCluster tc, Map<String, List<TaskAttemptDescriptor>> taskAttemptMap)
throws HyracksException {
- JobActivityGraph jag = jobRun.getJobActivityGraph();
+ ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
Task[] tasks = tc.getTasks();
List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
int attempts = tcAttempts.size();
@@ -309,7 +311,7 @@
Task ts = tasks[i];
TaskId tid = ts.getTaskId();
TaskAttempt taskAttempt = taskAttempts.get(tid);
- String nodeId = assignLocation(jag, locationMap, tid, taskAttempt);
+ String nodeId = assignLocation(acg, locationMap, tid, taskAttempt);
taskAttempt.setNodeId(nodeId);
taskAttempt.setStatus(TaskAttempt.TaskStatus.RUNNING, null);
taskAttempt.setStartTime(System.currentTimeMillis());
@@ -355,10 +357,11 @@
inProgressTaskClusters.add(tc);
}
- private String assignLocation(JobActivityGraph jag, Map<TaskId, LValueConstraintExpression> locationMap,
+ private String assignLocation(ActivityClusterGraph acg, Map<TaskId, LValueConstraintExpression> locationMap,
TaskId tid, TaskAttempt taskAttempt) throws HyracksException {
ActivityId aid = tid.getActivityId();
- Set<ActivityId> blockers = jag.getBlocked2BlockerMap().get(aid);
+ ActivityCluster ac = acg.getActivityMap().get(aid);
+ Set<ActivityId> blockers = ac.getBlocked2BlockerMap().get(aid);
String nodeId = null;
if (blockers != null) {
for (ActivityId blocker : blockers) {
@@ -404,8 +407,8 @@
private String findTaskLocation(TaskId tid) {
ActivityId aid = tid.getActivityId();
- ActivityCluster ac = jobRun.getActivityClusterMap().get(aid);
- Task[] tasks = ac.getPlan().getActivityPlanMap().get(aid).getTasks();
+ ActivityCluster ac = jobRun.getActivityClusterGraph().getActivityMap().get(aid);
+ Task[] tasks = getActivityClusterPlan(ac).getActivityPlanMap().get(aid).getTasks();
List<TaskClusterAttempt> tcAttempts = tasks[tid.getPartition()].getTaskCluster().getAttempts();
if (tcAttempts == null || tcAttempts.isEmpty()) {
return null;
@@ -425,8 +428,8 @@
private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
final JobId jobId = jobRun.getJobId();
- final JobActivityGraph jag = jobRun.getJobActivityGraph();
- final String appName = jag.getApplicationName();
+ final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
+ final String appName = jobRun.getApplicationName();
final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>(
jobRun.getConnectorPolicyMap());
for (Map.Entry<String, List<TaskAttemptDescriptor>> entry : taskAttemptMap.entrySet()) {
@@ -440,8 +443,9 @@
LOGGER.fine("Starting: " + taskDescriptors + " at " + entry.getKey());
}
try {
- byte[] jagBytes = changed ? JavaSerializationUtils.serialize(jag) : null;
- node.getNodeController().startTasks(appName, jobId, jagBytes, taskDescriptors, connectorPolicies);
+ byte[] jagBytes = changed ? JavaSerializationUtils.serialize(acg) : null;
+ node.getNodeController().startTasks(appName, jobId, jagBytes, taskDescriptors, connectorPolicies,
+ jobRun.getFlags());
} catch (Exception e) {
e.printStackTrace();
}
@@ -606,7 +610,7 @@
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
lastAttempt.setEndTime(System.currentTimeMillis());
abortDoomedTaskClusters();
- if (lastAttempt.getAttempt() >= jobRun.getJobActivityGraph().getMaxReattempts()) {
+ if (lastAttempt.getAttempt() >= jobRun.getActivityClusterGraph().getMaxReattempts()) {
abortJob(new HyracksException(details));
return;
}
@@ -629,8 +633,8 @@
public void notifyNodeFailures(Set<String> deadNodes) {
try {
jobRun.getPartitionMatchMaker().notifyNodeFailures(deadNodes);
- for (ActivityCluster ac : jobRun.getActivityClusters()) {
- TaskCluster[] taskClusters = ac.getPlan().getTaskClusters();
+ for (ActivityCluster ac : jobRun.getActivityClusterGraph().getActivityClusterMap().values()) {
+ TaskCluster[] taskClusters = getActivityClusterPlan(ac).getTaskClusters();
if (taskClusters != null) {
for (TaskCluster tc : taskClusters) {
TaskClusterAttempt lastTaskClusterAttempt = findLastTaskClusterAttempt(tc);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
index c964e4e..98cef8d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
@@ -19,7 +19,7 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.web.util.IJSONOutputFunction;
-import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork;
+import edu.uci.ics.hyracks.control.cc.work.GetActivityClusterGraphJSONWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobSummariesJSONWork;
@@ -49,7 +49,7 @@
JobId jobId = JobId.parse(arguments[0]);
if ("job-activity-graph".equalsIgnoreCase(arguments[1])) {
- GetJobActivityGraphJSONWork gjage = new GetJobActivityGraphJSONWork(ccs, jobId);
+ GetActivityClusterGraphJSONWork gjage = new GetActivityClusterGraphJSONWork(ccs, jobId);
ccs.getWorkQueue().scheduleAndSync(gjage);
result.put("result", gjage.getJSON());
} else if ("job-run".equalsIgnoreCase(arguments[1])) {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
index 01f14f3..8a552f5 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/AbstractTaskLifecycleWork.java
@@ -20,9 +20,9 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.ActivityPlan;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.Task;
@@ -49,10 +49,11 @@
JobRun run = ccs.getActiveRunMap().get(jobId);
if (run != null) {
TaskId tid = taId.getTaskId();
- Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
+ Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterGraph().getActivityMap();
ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
if (ac != null) {
- Map<ActivityId, ActivityPlan> taskStateMap = ac.getPlan().getActivityPlanMap();
+ Map<ActivityId, ActivityPlan> taskStateMap = run.getActivityClusterPlanMap().get(ac.getId())
+ .getActivityPlanMap();
Task[] taskStates = taskStateMap.get(tid.getActivityId()).getTasks();
if (taskStates != null && taskStates.length > tid.getPartition()) {
Task ts = taskStates[tid.getPartition()];
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
similarity index 87%
rename from hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java
rename to hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
index 677bece..1f79763 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobActivityGraphJSONWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetActivityClusterGraphJSONWork.java
@@ -21,12 +21,12 @@
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-public class GetJobActivityGraphJSONWork extends SynchronizableWork {
+public class GetActivityClusterGraphJSONWork extends SynchronizableWork {
private final ClusterControllerService ccs;
private final JobId jobId;
private JSONObject json;
- public GetJobActivityGraphJSONWork(ClusterControllerService ccs, JobId jobId) {
+ public GetActivityClusterGraphJSONWork(ClusterControllerService ccs, JobId jobId) {
this.ccs = ccs;
this.jobId = jobId;
}
@@ -41,7 +41,7 @@
return;
}
}
- json = run.getJobActivityGraph().toJSON();
+ json = run.getActivityClusterGraph().toJSON();
}
public JSONObject getJSON() {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java
index 5cc31f6..a0afd61 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSummariesJSONWork.java
@@ -44,7 +44,7 @@
JSONObject jo = new JSONObject();
jo.put("type", "job-summary");
jo.put("job-id", run.getJobId().toString());
- jo.put("application-name", run.getJobActivityGraph().getApplicationName());
+ jo.put("application-name", run.getApplicationName());
jo.put("create-time", run.getCreateTime());
jo.put("start-time", run.getCreateTime());
jo.put("end-time", run.getCreateTime());
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 8d8e85d..6b5ff03 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -21,7 +21,7 @@
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -69,7 +69,7 @@
}
}
} else {
- CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getJobActivityGraph().getApplicationName());
+ CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getApplicationName());
if (appCtx != null) {
try {
appCtx.notifyJobFinish(jobId);
@@ -91,8 +91,8 @@
private JSONObject createJobLogObject(final JobRun run) {
JSONObject jobLogObject = new JSONObject();
try {
- JobActivityGraph jag = run.getJobActivityGraph();
- jobLogObject.put("job-activity-graph", jag.toJSON());
+ ActivityClusterGraph acg = run.getActivityClusterGraph();
+ jobLogObject.put("activity-cluster-graph", acg.toJSON());
jobLogObject.put("job-run", run.toJSON());
} catch (JSONException e) {
throw new RuntimeException(e);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index fa22691..b6a33cd 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -15,43 +15,32 @@
package edu.uci.ics.hyracks.control.cc.work;
import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Set;
-import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
-import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.JobActivityGraphBuilder;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
-import edu.uci.ics.hyracks.control.cc.scheduler.JobScheduler;
import edu.uci.ics.hyracks.control.common.work.IResultCallback;
import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
public class JobStartWork extends SynchronizableWork {
private final ClusterControllerService ccs;
- private final byte[] jobSpec;
+ private final byte[] acggfBytes;
private final EnumSet<JobFlag> jobFlags;
private final JobId jobId;
private final String appName;
private final IResultCallback<JobId> callback;
- public JobStartWork(ClusterControllerService ccs, String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags,
+ public JobStartWork(ClusterControllerService ccs, String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags,
JobId jobId, IResultCallback<JobId> callback) {
this.jobId = jobId;
this.ccs = ccs;
- this.jobSpec = jobSpec;
+ this.acggfBytes = acggfBytes;
this.jobFlags = jobFlags;
this.appName = appName;
this.callback = callback;
@@ -64,53 +53,13 @@
if (appCtx == null) {
throw new HyracksException("No application with id " + appName + " found");
}
- JobSpecification spec = appCtx.createJobSpecification(jobSpec);
-
- final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(appName, spec, jobFlags);
- PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
- @Override
- public void visit(IConnectorDescriptor conn) throws HyracksException {
- builder.addConnector(conn);
- }
- });
- PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
- @Override
- public void visit(IOperatorDescriptor op) {
- op.contributeActivities(builder);
- }
- });
- builder.finish();
- final JobActivityGraph jag = builder.getActivityGraph();
-
- JobRun run = new JobRun(jobId, jag);
-
+ IActivityClusterGraphGeneratorFactory acggf = appCtx.createActivityClusterGraphGeneratorFactory(acggfBytes);
+ IActivityClusterGraphGenerator acgg = acggf.createActivityClusterGraphGenerator(appName, jobId, appCtx,
+ jobFlags);
+ JobRun run = new JobRun(ccs, jobId, appName, acgg, jobFlags);
run.setStatus(JobStatus.INITIALIZED, null);
-
ccs.getActiveRunMap().put(jobId, run);
- final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
- final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
- @Override
- public void addConstraint(Constraint constraint) {
- contributedConstraints.add(constraint);
- }
- };
- PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
- @Override
- public void visit(IOperatorDescriptor op) {
- op.contributeSchedulingConstraints(acceptor, jag, appCtx);
- }
- });
- PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
- @Override
- public void visit(IConnectorDescriptor conn) {
- conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
- }
- });
- contributedConstraints.addAll(spec.getUserConstraints());
-
- JobScheduler jrs = new JobScheduler(ccs, run, contributedConstraints);
- run.setScheduler(jrs);
- appCtx.notifyJobCreation(jobId, spec);
+ appCtx.notifyJobCreation(jobId, acggf);
run.setStatus(JobStatus.RUNNING, null);
try {
run.getScheduler().startJob();
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index dea4a8d..9927289 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -22,7 +22,7 @@
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -59,7 +59,7 @@
ncs.getActiveJobIds().remove(jobId);
}
if (cleanupPendingNodes.isEmpty()) {
- CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getJobActivityGraph().getApplicationName());
+ CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getApplicationName());
if (appCtx != null) {
try {
appCtx.notifyJobFinish(jobId);
@@ -82,8 +82,8 @@
private JSONObject createJobLogObject(final JobRun run) {
JSONObject jobLogObject = new JSONObject();
try {
- JobActivityGraph jag = run.getJobActivityGraph();
- jobLogObject.put("job-activity-graph", jag.toJSON());
+ ActivityClusterGraph acg = run.getActivityClusterGraph();
+ jobLogObject.put("activity-cluster-graph", acg.toJSON());
jobLogObject.put("job-run", run.toJSON());
} catch (JSONException e) {
throw new RuntimeException(e);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
index 2456c25..2a844ad 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterPartitionAvailibilityWork.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.control.cc.work;
import java.util.List;
-import java.util.logging.Level;
import org.apache.commons.lang3.tuple.Pair;
@@ -57,11 +56,6 @@
}
@Override
- public Level logLevel() {
- return Level.FINEST;
- }
-
- @Override
public String toString() {
return "PartitionAvailable@" + partitionDescriptor;
}
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
index 602d860..0fff257 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/TaskFailureWork.java
@@ -15,9 +15,9 @@
package edu.uci.ics.hyracks.control.cc.work;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 47f4ceb..049adf8 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.common.base;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -21,6 +22,7 @@
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -28,7 +30,7 @@
public interface INodeController {
public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception;
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception;
public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
index 350f527..6c208fe 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.common.controllers;
+import java.io.File;
import java.util.List;
import org.kohsuke.args4j.Option;
@@ -52,6 +53,9 @@
@Option(name = "-cc-root", usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
public String ccRoot = "ClusterControllerService";
+ @Option(name = "-cluster-topology", usage = "Sets the XML file that defines the cluster topology. (default: null)")
+ public File clusterTopologyDefinition = null;
+
public void toCommandLine(List<String> cList) {
cList.add("-client-net-ip-address");
cList.add(clientNetIpAddress);
@@ -75,5 +79,9 @@
cList.add(String.valueOf(jobHistorySize));
cList.add("-cc-root");
cList.add(ccRoot);
+ if (clusterTopologyDefinition != null) {
+ cList.add("-cluster-topology");
+ cList.add(clusterTopologyDefinition.getAbsolutePath());
+ }
}
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index e5f0efc..8f0056f 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -22,6 +22,7 @@
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
@@ -34,6 +35,7 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -498,15 +500,17 @@
private final byte[] planBytes;
private final List<TaskAttemptDescriptor> taskDescriptors;
private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+ private final EnumSet<JobFlag> flags;
public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) {
this.appName = appName;
this.jobId = jobId;
this.planBytes = planBytes;
this.taskDescriptors = taskDescriptors;
this.connectorPolicies = connectorPolicies;
+ this.flags = flags;
}
@Override
@@ -533,6 +537,10 @@
public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
return connectorPolicies;
}
+
+ public EnumSet<JobFlag> getFlags() {
+ return flags;
+ }
}
public static class AbortTasksFunction extends Function {
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 8176cb7..10c0a7c 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.control.common.ipc;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -21,6 +22,7 @@
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -37,9 +39,9 @@
@Override
public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, EnumSet<JobFlag> flags) throws Exception {
CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId, planBytes,
- taskDescriptors, connectorPolicies);
+ taskDescriptors, connectorPolicies, flags);
ipcHandle.send(-1, stf, null);
}
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
index 36f0c49..26e5412 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -70,11 +70,11 @@
public void schedule(AbstractWork event) {
enqueueCount.incrementAndGet();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Enqueue: " + enqueueCount);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Enqueue: " + enqueueCount);
}
- if (LOGGER.isLoggable(event.logLevel())) {
- LOGGER.log(event.logLevel(), "Scheduling: " + event);
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("Scheduling: " + event);
}
queue.offer(event);
}
@@ -105,8 +105,8 @@
continue;
}
dequeueCount.incrementAndGet();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Dequeue: " + dequeueCount + "/" + enqueueCount);
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Dequeue: " + dequeueCount + "/" + enqueueCount);
}
try {
if (LOGGER.isLoggable(r.logLevel())) {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 10c9c3d..63a45db 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -32,11 +32,11 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.IGlobalJobDataFactory;
import edu.uci.ics.hyracks.api.job.IJobletEventListener;
import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
@@ -60,7 +60,7 @@
private final JobId jobId;
- private final JobActivityGraph jag;
+ private final ActivityClusterGraph acg;
private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
@@ -84,11 +84,12 @@
private boolean cleanupPending;
- public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag) {
+ public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx,
+ ActivityClusterGraph acg) {
this.nodeController = nodeController;
this.appCtx = appCtx;
this.jobId = jobId;
- this.jag = jag;
+ this.acg = acg;
partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
env = new OperatorEnvironmentImpl(nodeController.getId());
stateObjectMap = new HashMap<Object, IStateObject>();
@@ -97,7 +98,7 @@
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
cleanupPending = false;
- IJobletEventListenerFactory jelf = jag.getJobletEventListenerFactory();
+ IJobletEventListenerFactory jelf = acg.getJobletEventListenerFactory();
if (jelf != null) {
IJobletEventListener listener = jelf.createListener(this);
this.jobletEventListener = listener;
@@ -105,7 +106,7 @@
} else {
jobletEventListener = null;
}
- IGlobalJobDataFactory gjdf = jag.getGlobalJobDataFactory();
+ IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
}
@@ -114,8 +115,8 @@
return jobId;
}
- public JobActivityGraph getJobActivityGraph() {
- return jag;
+ public ActivityClusterGraph getActivityClusterGraph() {
+ return acg;
}
public IOperatorEnvironment getEnvironment() {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 3cab638..ecdd839 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -377,7 +377,7 @@
case START_TASKS: {
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
- .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies()));
+ .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
return;
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index e8c4052..8f8c032 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -21,12 +21,12 @@
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.Task;
-public class AbortTasksWork extends SynchronizableWork {
+public class AbortTasksWork extends AbstractWork {
private static final Logger LOGGER = Logger.getLogger(AbortTasksWork.class.getName());
private final NodeControllerService ncs;
@@ -42,7 +42,7 @@
}
@Override
- protected void doRun() throws Exception {
+ public void run() {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
index b75a1fc..173ab92 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -23,11 +23,11 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.IPartition;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-public class CleanupJobletWork extends SynchronizableWork {
+public class CleanupJobletWork extends AbstractWork {
private static final Logger LOGGER = Logger.getLogger(CleanupJobletWork.class.getName());
private final NodeControllerService ncs;
@@ -43,7 +43,7 @@
}
@Override
- protected void doRun() throws Exception {
+ public void run() {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Cleaning up after job: " + jobId);
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
index 159ecb0..6eb1a95 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -18,8 +18,6 @@
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
@@ -31,13 +29,11 @@
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
-public class CreateApplicationWork extends SynchronizableWork {
- private static final Logger LOGGER = Logger.getLogger(CreateApplicationWork.class.getName());
-
+public class CreateApplicationWork extends AbstractWork {
private final NodeControllerService ncs;
private final String appName;
@@ -55,7 +51,7 @@
}
@Override
- protected void doRun() throws Exception {
+ public void run() {
try {
NCApplicationContext appCtx;
Map<String, NCApplicationContext> applications = ncs.getApplications();
@@ -86,8 +82,7 @@
ncs.getClusterController()
.notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
} catch (Exception e) {
- LOGGER.warning("Error creating application: " + e.getMessage());
- LOGGER.log(Level.WARNING, e.getLocalizedMessage(), e);
+ throw new RuntimeException(e);
}
}
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
index cfe00f6..b104ce8 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -15,17 +15,14 @@
package edu.uci.ics.hyracks.control.nc.work;
import java.util.Map;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
-public class DestroyApplicationWork extends SynchronizableWork {
- private static final Logger LOGGER = Logger.getLogger(DestroyApplicationWork.class.getName());
-
+public class DestroyApplicationWork extends AbstractWork {
private final NodeControllerService ncs;
private final String appName;
@@ -36,16 +33,17 @@
}
@Override
- protected void doRun() throws Exception {
+ public void run() {
try {
Map<String, NCApplicationContext> applications = ncs.getApplications();
ApplicationContext appCtx = applications.remove(appName);
if (appCtx != null) {
appCtx.deinitialize();
}
+ ncs.getClusterController().notifyApplicationStateChange(ncs.getId(), appName,
+ ApplicationStatus.DEINITIALIZED);
} catch (Exception e) {
- LOGGER.warning("Error destroying application: " + e.getMessage());
+ throw new RuntimeException(e);
}
- ncs.getClusterController().notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.DEINITIALIZED);
}
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index 9734567..2cf43da5b 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -17,20 +17,17 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
-import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
-public class ReportPartitionAvailabilityWork extends SynchronizableWork {
- private static final Logger LOGGER = Logger.getLogger(ReportPartitionAvailabilityWork.class.getName());
-
+public class ReportPartitionAvailabilityWork extends AbstractWork {
private final NodeControllerService ncs;
private final PartitionId pid;
@@ -44,14 +41,18 @@
}
@Override
- protected void doRun() throws Exception {
- Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
- Joblet ji = jobletMap.get(pid.getJobId());
- if (ji != null) {
- PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
- ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
- .getIpAddress()), networkAddress.getPort()), pid, 5));
- ji.reportPartitionAvailability(channel);
+ public void run() {
+ try {
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(pid.getJobId());
+ if (ji != null) {
+ PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
+ ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+ .getIpAddress()), networkAddress.getPort()), pid, 5));
+ ji.reportPartitionAvailability(channel);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 8d1b2a6..44be4b3 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -37,12 +37,13 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.Task;
@@ -53,7 +54,7 @@
import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
import edu.uci.ics.hyracks.control.nc.profiling.ProfilingPartitionWriterFactory;
-public class StartTasksWork extends SynchronizableWork {
+public class StartTasksWork extends AbstractWork {
private static final Logger LOGGER = Logger.getLogger(StartTasksWork.class.getName());
private final NodeControllerService ncs;
@@ -62,50 +63,57 @@
private final JobId jobId;
- private final byte[] jagBytes;
+ private final byte[] acgBytes;
private final List<TaskAttemptDescriptor> taskDescriptors;
private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap;
- public StartTasksWork(NodeControllerService ncs, String appName, JobId jobId, byte[] jagBytes,
+ private final EnumSet<JobFlag> flags;
+
+ public StartTasksWork(NodeControllerService ncs, String appName, JobId jobId, byte[] acgBytes,
List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) {
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, EnumSet<JobFlag> flags) {
this.ncs = ncs;
this.appName = appName;
this.jobId = jobId;
- this.jagBytes = jagBytes;
+ this.acgBytes = acgBytes;
this.taskDescriptors = taskDescriptors;
this.connectorPoliciesMap = connectorPoliciesMap;
+ this.flags = flags;
}
@Override
- protected void doRun() throws Exception {
+ public void run() {
try {
Map<String, NCApplicationContext> applications = ncs.getApplications();
NCApplicationContext appCtx = applications.get(appName);
- final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, jagBytes == null ? null
- : (JobActivityGraph) appCtx.deserialize(jagBytes));
- final JobActivityGraph jag = joblet.getJobActivityGraph();
+ final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, acgBytes == null ? null
+ : (ActivityClusterGraph) appCtx.deserialize(acgBytes));
+ final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@Override
public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
- IConnectorDescriptor conn = jag.getActivityOutputMap().get(aid).get(outputIndex);
- return jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ ActivityCluster ac = acg.getActivityMap().get(aid);
+ IConnectorDescriptor conn = ac.getActivityOutputMap().get(aid).get(outputIndex);
+ return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
}
@Override
public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
- IConnectorDescriptor conn = jag.getActivityInputMap().get(aid).get(inputIndex);
- return jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ ActivityCluster ac = acg.getActivityMap().get(aid);
+ IConnectorDescriptor conn = ac.getActivityInputMap().get(aid).get(inputIndex);
+ return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
}
};
for (TaskAttemptDescriptor td : taskDescriptors) {
TaskAttemptId taId = td.getTaskAttemptId();
TaskId tid = taId.getTaskId();
- IActivity han = jag.getActivityMap().get(tid.getActivityId());
+ ActivityId aid = tid.getActivityId();
+ ActivityCluster ac = acg.getActivityMap().get(aid);
+ IActivity han = ac.getActivityMap().get(aid);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Initializing " + taId + " -> " + han);
}
@@ -115,7 +123,7 @@
List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
- List<IConnectorDescriptor> inputs = jag.getActivityInputMap().get(tid.getActivityId());
+ List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
if (inputs != null) {
for (int i = 0; i < inputs.size(); ++i) {
IConnectorDescriptor conn = inputs.get(i);
@@ -123,21 +131,21 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("input: " + i + ": " + conn.getConnectorId());
}
- RecordDescriptor recordDesc = jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
recordDesc, cPolicy);
collectors.add(collector);
}
}
- List<IConnectorDescriptor> outputs = jag.getActivityOutputMap().get(tid.getActivityId());
+ List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
if (outputs != null) {
for (int i = 0; i < outputs.size(); ++i) {
final IConnectorDescriptor conn = outputs.get(i);
- RecordDescriptor recordDesc = jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
- partition, taId, jag.getJobFlags());
+ partition, taId, flags);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("output: " + i + ": " + conn.getConnectorId());
@@ -155,19 +163,19 @@
}
} catch (Exception e) {
e.printStackTrace();
- throw e;
+ throw new RuntimeException(e);
}
}
- private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag)
+ private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, ActivityClusterGraph acg)
throws Exception {
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(jobId);
if (ji == null) {
- if (jag == null) {
+ if (acg == null) {
throw new NullPointerException("JobActivityGraph was null");
}
- ji = new Joblet(ncs, jobId, appCtx, jag);
+ ji = new Joblet(ncs, jobId, appCtx, acg);
jobletMap.put(jobId, ji);
}
return ji;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 61d2733..ef479e1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -21,8 +21,8 @@
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
public abstract class AbstractConnectorDescriptor implements IConnectorDescriptor {
private static final long serialVersionUID = 1L;
@@ -59,7 +59,7 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
ICCApplicationContext appCtx) {
// do nothing
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 190d890..153abd3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -77,8 +77,7 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
- ICCApplicationContext appCtx) {
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx) {
// do nothing
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index a69fd01..7d007a4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -27,8 +27,8 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
@@ -60,10 +60,10 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
ICCApplicationContext appCtx) {
- OperatorDescriptorId consumer = plan.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
- OperatorDescriptorId producer = plan.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
+ OperatorDescriptorId consumer = ac.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
+ OperatorDescriptorId producer = ac.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer),
new PartitionCountExpression(producer)));
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
new file mode 100644
index 0000000..b58b1c2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class IdentityOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public IdentityOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc) {
+ super(spec, 1, 1);
+ recordDescriptors[0] = rDesc;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
index 9ec08cf..dabc8a4 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/ChannelSet.java
@@ -24,8 +24,6 @@
public class ChannelSet {
private static final Logger LOGGER = Logger.getLogger(ChannelSet.class.getName());
- private static final int MAX_OPEN_CHANNELS = 1024;
-
private static final int INITIAL_SIZE = 16;
private final MultiplexedConnection mConn;
@@ -204,8 +202,8 @@
}
private ChannelControlBlock createChannel(int idx) throws NetException {
- if (idx >= MAX_OPEN_CHANNELS) {
- throw new NetException("More than " + MAX_OPEN_CHANNELS + " opened concurrently");
+ if (idx > MuxDemuxCommand.MAX_CHANNEL_ID) {
+ throw new NetException("Channel Id > " + MuxDemuxCommand.MAX_CHANNEL_ID + " being opened");
}
if (idx >= ccbArray.length) {
expand(idx);
diff --git a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
index 2e2636b..4610170 100644
--- a/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
+++ b/hyracks-net/src/main/java/edu/uci/ics/hyracks/net/protocols/muxdemux/MuxDemuxCommand.java
@@ -5,11 +5,11 @@
import edu.uci.ics.hyracks.net.exceptions.NetException;
class MuxDemuxCommand {
- static final int MAX_CHANNEL_ID = 0x3ff;
+ static final int MAX_CHANNEL_ID = Integer.MAX_VALUE - 1;
- static final int COMMAND_SIZE = 4;
+ static final int COMMAND_SIZE = 8;
- static final int MAX_DATA_VALUE = 0x7ffff;
+ static final int MAX_DATA_VALUE = 0x1fffffff;
enum CommandType {
OPEN_CHANNEL,
@@ -57,15 +57,15 @@
}
public void write(ByteBuffer buffer) {
- int cmd = (channelId << 22) | (type.ordinal() << 19) | (data & 0x7ffff);
- buffer.putInt(cmd);
+ long cmd = (((long) channelId) << 32) | (((long) type.ordinal()) << 29) | (data & 0x1fffffff);
+ buffer.putLong(cmd);
}
public void read(ByteBuffer buffer) {
- int cmd = buffer.getInt();
- channelId = (cmd >> 22) & 0x3ff;
- type = CommandType.values()[(cmd >> 19) & 0x7];
- data = cmd & 0x7ffff;
+ long cmd = buffer.getLong();
+ channelId = (int) ((cmd >> 32) & 0x7fffffff);
+ type = CommandType.values()[(int) ((cmd >> 29) & 0x7)];
+ data = (int) (cmd & 0x1fffffff);
}
@Override