Refactored Node Controller to use work queue
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@659 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
index cd6579d..e94086f 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
@@ -45,7 +45,7 @@
try {
wait();
} catch (InterruptedException e) {
- e.printStackTrace();
+ throw e;
}
}
if (e != null) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 2176dda..36c31cf 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -154,12 +154,14 @@
taskMap.remove(task);
TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId());
task.dumpProfile(taskProfile);
- nodeController.notifyTaskComplete(jobId, task.getTaskAttemptId(), taskProfile);
+ nodeController.getClusterController().notifyTaskComplete(jobId, task.getTaskAttemptId(),
+ nodeController.getId(), taskProfile);
}
- public synchronized void notifyTaskFailed(Task task, Exception exception) {
+ public synchronized void notifyTaskFailed(Task task, Exception exception) throws Exception {
taskMap.remove(task);
- nodeController.notifyTaskFailed(jobId, task.getTaskAttemptId(), exception);
+ nodeController.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), nodeController.getId(),
+ exception);
}
public NodeControllerService getNodeController() {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 28cb8be..50c2c32 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -15,9 +15,6 @@
package edu.uci.ics.hyracks.control.nc;
import java.io.File;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
@@ -25,12 +22,10 @@
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.text.MessageFormat;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -44,39 +39,15 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.DefaultHttpClient;
-
-import edu.uci.ics.hyracks.api.application.INCApplicationContext;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
-import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivity;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
-import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.naming.MultipartName;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
-import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
import edu.uci.ics.hyracks.control.common.base.INodeController;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -87,16 +58,18 @@
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
-import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
-import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
-import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
-import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
-import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
+import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.CleanupJobWork;
+import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
public class NodeControllerService extends AbstractRemoteService implements INodeController {
private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -113,6 +86,8 @@
private final ConnectionManager connectionManager;
+ private final WorkQueue queue;
+
private final Timer timer;
private IClusterController ccs;
@@ -147,6 +122,7 @@
partitionManager = new PartitionManager(this);
connectionManager.setPartitionRequestListener(partitionManager);
+ queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
@@ -206,6 +182,18 @@
return id;
}
+ public ServerContext getServerContext() {
+ return serverCtx;
+ }
+
+ public Map<String, NCApplicationContext> getApplications() {
+ return applications;
+ }
+
+ public Map<JobId, Joblet> getJobletMap() {
+ return jobletMap;
+ }
+
public ConnectionManager getConnectionManager() {
return connectionManager;
}
@@ -218,6 +206,63 @@
return ccs;
}
+ public NodeParameters getNodeParameters() {
+ return nodeParameters;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ @Override
+ public void startTasks(String appName, final JobId jobId, byte[] jagBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
+ StartTasksWork stw = new StartTasksWork(this, appName, jobId, jagBytes, taskDescriptors, connectorPoliciesMap);
+ queue.scheduleAndSync(stw);
+ }
+
+ @Override
+ public void cleanUpJob(JobId jobId) throws Exception {
+ CleanupJobWork cjw = new CleanupJobWork(this, jobId);
+ queue.scheduleAndSync(cjw);
+ }
+
+ @Override
+ public void notifyRegistration(IClusterController ccs) throws Exception {
+ this.ccs = ccs;
+ }
+
+ @Override
+ public NCConfig getConfiguration() throws Exception {
+ return ncConfig;
+ }
+
+ @Override
+ public synchronized void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
+ AbortTasksWork atw = new AbortTasksWork(this, jobId, tasks);
+ queue.scheduleAndSync(atw);
+ }
+
+ @Override
+ public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+ throws Exception {
+ CreateApplicationWork caw = new CreateApplicationWork(this, appName, deployHar, serializedDistributedState);
+ queue.scheduleAndSync(caw);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ DestroyApplicationWork daw = new DestroyApplicationWork(this, appName);
+ queue.scheduleAndSync(daw);
+ }
+
+ @Override
+ public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
+ ReportPartitionAvailabilityWork rpaw = new ReportPartitionAvailabilityWork(this, pid, networkAddress);
+ queue.scheduleAndSync(rpaw);
+ }
+
private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
String ipaddrStr = ncConfig.dataIPAddress;
ipaddrStr = ipaddrStr.trim();
@@ -235,184 +280,6 @@
return InetAddress.getByAddress(ipBytes);
}
- @Override
- public void startTasks(String appName, final JobId jobId, byte[] jagBytes,
- List<TaskAttemptDescriptor> taskDescriptors,
- Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, byte[] ctxVarBytes) throws Exception {
- try {
- NCApplicationContext appCtx = applications.get(appName);
- final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
- Map<MultipartName, Object> ctxVarMap = (Map<MultipartName, Object>) appCtx.deserialize(ctxVarBytes);
-
- IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
- @Override
- public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
- return plan.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
- }
-
- @Override
- public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
- return plan.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
- }
- };
-
- final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx);
-
- for (TaskAttemptDescriptor td : taskDescriptors) {
- TaskAttemptId taId = td.getTaskAttemptId();
- TaskId tid = taId.getTaskId();
- IActivity han = plan.getActivityNodeMap().get(tid.getActivityId());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Initializing " + taId + " -> " + han);
- }
- final int partition = tid.getPartition();
- Map<MultipartName, Object> inputGlobalVariables = createInputGlobalVariables(ctxVarMap, han);
- Task task = new Task(joblet, taId, han.getClass().getName(), executor);
- IOperatorEnvironment env = joblet.getEnvironment(tid.getActivityId().getOperatorDescriptorId(),
- tid.getPartition());
- IOperatorNodePushable operator = han.createPushRuntime(task, env, rdp, partition,
- td.getPartitionCount());
-
- List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
-
- List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getActivityId());
- if (inputs != null) {
- for (int i = 0; i < inputs.size(); ++i) {
- IConnectorDescriptor conn = inputs.get(i);
- IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("input: " + i + ": " + conn.getConnectorId());
- }
- RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
- IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
- recordDesc, cPolicy);
- collectors.add(collector);
- }
- }
- List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
- if (outputs != null) {
- for (int i = 0; i < outputs.size(); ++i) {
- final IConnectorDescriptor conn = outputs.get(i);
- RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
- IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
-
- IPartitionWriterFactory pwFactory = createPartitionWriterFactory(cPolicy, jobId, conn,
- partition, taId);
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("output: " + i + ": " + conn.getConnectorId());
- }
- IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
- td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
- operator.setOutputFrameWriter(i, writer, recordDesc);
- }
- }
-
- task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
- joblet.addTask(task);
-
- task.start();
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
-
- private Map<MultipartName, Object> createInputGlobalVariables(Map<MultipartName, Object> ctxVarMap, IActivity han) {
- Map<MultipartName, Object> gVars = new HashMap<MultipartName, Object>();
- // for (MultipartName inVar : han.getInputVariables()) {
- // gVars.put(inVar, ctxVarMap.get(inVar));
- // }
- return gVars;
- }
-
- private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
- int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
- throws HyracksDataException {
- IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
- td.getInputPartitionCounts()[i], td.getPartitionCount());
- if (cPolicy.materializeOnReceiveSide()) {
- return new ReceiveSideMaterializingCollector(ctx, partitionManager, collector, task.getTaskAttemptId(),
- executor);
- } else {
- return collector;
- }
- }
-
- private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final JobId jobId,
- final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
- if (cPolicy.materializeOnSendSide()) {
- return new IPartitionWriterFactory() {
- @Override
- public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new MaterializedPartitionWriter(ctx, partitionManager, new PartitionId(jobId,
- conn.getConnectorId(), senderIndex, receiverIndex), taId, executor);
- }
- };
- } else {
- return new IPartitionWriterFactory() {
- @Override
- public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
- senderIndex, receiverIndex), taId);
- }
- };
- }
- }
-
- private synchronized Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx) throws Exception {
- Joblet ji = jobletMap.get(jobId);
- if (ji == null) {
- ji = new Joblet(this, jobId, appCtx);
- jobletMap.put(jobId, ji);
- }
- return ji;
- }
-
- public Executor getExecutor() {
- return executor;
- }
-
- @Override
- public void cleanUpJob(JobId jobId) throws Exception {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Cleaning up after job: " + jobId);
- }
- Joblet joblet = jobletMap.remove(jobId);
- if (joblet != null) {
- partitionManager.unregisterPartitions(jobId);
- joblet.close();
- }
- }
-
- public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, TaskProfile taskProfile) throws Exception {
- try {
- ccs.notifyTaskComplete(jobId, taskId, id, taskProfile);
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
-
- public void notifyTaskFailed(JobId jobId, TaskAttemptId taskId, Exception exception) {
- try {
- ccs.notifyTaskFailure(jobId, taskId, id, exception);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void notifyRegistration(IClusterController ccs) throws Exception {
- this.ccs = ccs;
- }
-
- @Override
- public NCConfig getConfiguration() throws Exception {
- return ncConfig;
- }
-
private class HeartbeatTask extends TimerTask {
private IClusterController cc;
@@ -483,70 +350,4 @@
}
}
}
-
- @Override
- public synchronized void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
- }
- Joblet ji = jobletMap.get(jobId);
- if (ji != null) {
- Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
- for (TaskAttemptId taId : tasks) {
- Task task = taskMap.get(taId);
- if (task != null) {
- task.abort();
- }
- }
- ji.close();
- }
- }
-
- @Override
- public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
- throws Exception {
- NCApplicationContext appCtx;
- synchronized (applications) {
- if (applications.containsKey(appName)) {
- throw new HyracksException("Duplicate application with name: " + appName + " being created.");
- }
- appCtx = new NCApplicationContext(serverCtx, ctx, appName, id);
- applications.put(appName, appCtx);
- }
- if (deployHar) {
- HttpClient hc = new DefaultHttpClient();
- HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
- + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
- HttpResponse response = hc.execute(get);
- InputStream is = response.getEntity().getContent();
- OutputStream os = appCtx.getHarOutputStream();
- try {
- IOUtils.copyLarge(is, os);
- } finally {
- os.close();
- is.close();
- }
- }
- appCtx.initializeClassPath();
- appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
- appCtx.initialize();
- }
-
- @Override
- public void destroyApplication(String appName) throws Exception {
- ApplicationContext appCtx = applications.remove(appName);
- if (appCtx != null) {
- appCtx.deinitialize();
- }
- }
-
- @Override
- public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
- Joblet ji = jobletMap.get(pid.getJobId());
- if (ji != null) {
- PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ctx, connectionManager,
- new InetSocketAddress(networkAddress.getIpAddress(), networkAddress.getPort()), pid, 1));
- ji.reportPartitionAvailability(channel);
- }
- }
}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 6a73902..0cb938b 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -249,7 +249,11 @@
joblet.notifyTaskComplete(this);
} catch (Exception e) {
e.printStackTrace();
- joblet.notifyTaskFailed(this, e);
+ try {
+ joblet.notifyTaskFailed(this, e);
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ }
} finally {
ct.setName(threadName);
close();
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
new file mode 100644
index 0000000..6549a74
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class AbortTasksWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(AbortTasksWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final JobId jobId;
+
+ private final List<TaskAttemptId> tasks;
+
+ public AbortTasksWork(NodeControllerService ncs, JobId jobId, List<TaskAttemptId> tasks) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
+ }
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(jobId);
+ if (ji != null) {
+ Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
+ for (TaskAttemptId taId : tasks) {
+ Task task = taskMap.get(taId);
+ if (task != null) {
+ task.abort();
+ }
+ }
+ ji.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
new file mode 100644
index 0000000..50c05df
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobWork.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class CleanupJobWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(CleanupJobWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final JobId jobId;
+
+ public CleanupJobWork(NodeControllerService ncs, JobId jobId) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Cleaning up after job: " + jobId);
+ }
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet joblet = jobletMap.remove(jobId);
+ if (joblet != null) {
+ ncs.getPartitionManager().unregisterPartitions(jobId);
+ joblet.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
new file mode 100644
index 0000000..120e376
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class CreateApplicationWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(CreateApplicationWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final String appName;
+
+ private final boolean deployHar;
+
+ private final byte[] serializedDistributedState;
+
+ public CreateApplicationWork(NodeControllerService ncs, String appName, boolean deployHar,
+ byte[] serializedDistributedState) {
+ this.ncs = ncs;
+ this.appName = appName;
+ this.deployHar = deployHar;
+ this.serializedDistributedState = serializedDistributedState;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ NCApplicationContext appCtx;
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ if (applications.containsKey(appName)) {
+ throw new HyracksException("Duplicate application with name: " + appName + " being created.");
+ }
+ appCtx = new NCApplicationContext(ncs.getServerContext(), ncs.getRootContext(), appName, ncs.getId());
+ applications.put(appName, appCtx);
+ if (deployHar) {
+ NCConfig ncConfig = ncs.getConfiguration();
+ NodeParameters nodeParameters = ncs.getNodeParameters();
+ HttpClient hc = new DefaultHttpClient();
+ HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
+ + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
+ HttpResponse response = hc.execute(get);
+ InputStream is = response.getEntity().getContent();
+ OutputStream os = appCtx.getHarOutputStream();
+ try {
+ IOUtils.copyLarge(is, os);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
+ appCtx.initializeClassPath();
+ appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
+ appCtx.initialize();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
new file mode 100644
index 0000000..6a5fbfc
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class DestroyApplicationWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(DestroyApplicationWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final String appName;
+
+ public DestroyApplicationWork(NodeControllerService ncs, String appName) {
+ this.ncs = ncs;
+ this.appName = appName;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ ApplicationContext appCtx = applications.remove(appName);
+ if (appCtx != null) {
+ appCtx.deinitialize();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
new file mode 100644
index 0000000..bfa21c9
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
+
+public class ReportPartitionAvailabilityWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(ReportPartitionAvailabilityWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final PartitionId pid;
+
+ private final NetworkAddress networkAddress;
+
+ public ReportPartitionAvailabilityWork(NodeControllerService ncs, PartitionId pid, NetworkAddress networkAddress) {
+ this.ncs = ncs;
+ this.pid = pid;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(pid.getJobId());
+ if (ji != null) {
+ PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
+ ncs.getConnectionManager(), new InetSocketAddress(networkAddress.getIpAddress(),
+ networkAddress.getPort()), pid, 1));
+ ji.reportPartitionAvailability(channel);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
new file mode 100644
index 0000000..ed83451
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
+import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
+
+public class StartTasksWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(StartTasksWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final String appName;
+
+ private final JobId jobId;
+
+ private final byte[] jagBytes;
+
+ private final List<TaskAttemptDescriptor> taskDescriptors;
+
+ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap;
+
+ public StartTasksWork(NodeControllerService ncs, String appName, JobId jobId, byte[] jagBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) {
+ this.ncs = ncs;
+ this.appName = appName;
+ this.jobId = jobId;
+ this.jagBytes = jagBytes;
+ this.taskDescriptors = taskDescriptors;
+ this.connectorPoliciesMap = connectorPoliciesMap;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ NCApplicationContext appCtx = applications.get(appName);
+ final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
+
+ IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
+ @Override
+ public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
+ return plan.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
+ }
+
+ @Override
+ public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
+ return plan.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+ }
+ };
+
+ final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx);
+
+ for (TaskAttemptDescriptor td : taskDescriptors) {
+ TaskAttemptId taId = td.getTaskAttemptId();
+ TaskId tid = taId.getTaskId();
+ IActivity han = plan.getActivityNodeMap().get(tid.getActivityId());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Initializing " + taId + " -> " + han);
+ }
+ final int partition = tid.getPartition();
+ Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor());
+ IOperatorEnvironment env = joblet.getEnvironment(tid.getActivityId().getOperatorDescriptorId(),
+ tid.getPartition());
+ IOperatorNodePushable operator = han.createPushRuntime(task, env, rdp, partition,
+ td.getPartitionCount());
+
+ List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
+
+ List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getActivityId());
+ if (inputs != null) {
+ for (int i = 0; i < inputs.size(); ++i) {
+ IConnectorDescriptor conn = inputs.get(i);
+ IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("input: " + i + ": " + conn.getConnectorId());
+ }
+ RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
+ IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
+ recordDesc, cPolicy);
+ collectors.add(collector);
+ }
+ }
+ List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
+ if (outputs != null) {
+ for (int i = 0; i < outputs.size(); ++i) {
+ final IConnectorDescriptor conn = outputs.get(i);
+ RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
+ IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+
+ IPartitionWriterFactory pwFactory = createPartitionWriterFactory(cPolicy, jobId, conn,
+ partition, taId);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("output: " + i + ": " + conn.getConnectorId());
+ }
+ IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
+ td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+ operator.setOutputFrameWriter(i, writer, recordDesc);
+ }
+ }
+
+ task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
+ joblet.addTask(task);
+
+ task.start();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx) throws Exception {
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(jobId);
+ if (ji == null) {
+ ji = new Joblet(ncs, jobId, appCtx);
+ jobletMap.put(jobId, ji);
+ }
+ return ji;
+ }
+
+ private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
+ int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
+ throws HyracksDataException {
+ IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+ td.getInputPartitionCounts()[i], td.getPartitionCount());
+ if (cPolicy.materializeOnReceiveSide()) {
+ return new ReceiveSideMaterializingCollector(ncs.getRootContext(), ncs.getPartitionManager(), collector,
+ task.getTaskAttemptId(), ncs.getExecutor());
+ } else {
+ return collector;
+ }
+ }
+
+ private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final JobId jobId,
+ final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId) {
+ if (cPolicy.materializeOnSendSide()) {
+ return new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+ ncs.getExecutor());
+ }
+ };
+ } else {
+ return new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new PipelinedPartition(ncs.getPartitionManager(), new PartitionId(jobId,
+ conn.getConnectorId(), senderIndex, receiverIndex), taId);
+ }
+ };
+ }
+ }
+}
\ No newline at end of file