Grouped hyracks-control-* under hyracks-control
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_isolation@1257 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-control/hyracks-control-nc/pom.xml
new file mode 100644
index 0000000..49f2371
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/pom.xml
@@ -0,0 +1,52 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-control-nc</artifactId>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.dcache</groupId>
+ <artifactId>dcache-client</artifactId>
+ <version>0.0.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-common</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-net</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.0.1</version>
+ </plugin>
+ </plugins>
+ </reporting>
+</project>
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
new file mode 100644
index 0000000..8f7bc4a
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+
+public class Joblet implements IHyracksJobletContext, ICounterContext {
+ private final NodeControllerService nodeController;
+
+ private final INCApplicationContext appCtx;
+
+ private final JobId jobId;
+
+ private final JobActivityGraph jag;
+
+ private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
+
+ private final IOperatorEnvironment env;
+
+ private final Map<TaskId, ITaskState> taskStateMap;
+
+ private final Map<TaskAttemptId, Task> taskMap;
+
+ private final Map<String, Counter> counterMap;
+
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final IWorkspaceFileFactory fileFactory;
+
+ private IJobletEventListener jobletEventListener;
+
+ private JobStatus cleanupStatus;
+
+ private boolean cleanupPending;
+
+ public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag) {
+ this.nodeController = nodeController;
+ this.appCtx = appCtx;
+ this.jobId = jobId;
+ this.jag = jag;
+ partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
+ env = new OperatorEnvironmentImpl(nodeController.getId());
+ taskStateMap = new HashMap<TaskId, ITaskState>();
+ taskMap = new HashMap<TaskAttemptId, Task>();
+ counterMap = new HashMap<String, Counter>();
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
+ cleanupPending = false;
+ }
+
+ @Override
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public JobActivityGraph getJobActivityGraph() {
+ return jag;
+ }
+
+ public IOperatorEnvironment getEnvironment() {
+ return env;
+ }
+
+ public void addTask(Task task) {
+ taskMap.put(task.getTaskAttemptId(), task);
+ }
+
+ public void removeTask(Task task) {
+ taskMap.remove(task.getTaskAttemptId());
+ if (cleanupPending && taskMap.isEmpty()) {
+ performCleanup();
+ }
+ }
+
+ public Map<TaskAttemptId, Task> getTaskMap() {
+ return taskMap;
+ }
+
+ private final class OperatorEnvironmentImpl implements IOperatorEnvironment {
+ private final String nodeId;
+
+ public OperatorEnvironmentImpl(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String toString() {
+ return super.toString() + "@" + nodeId;
+ }
+
+ @Override
+ public void setTaskState(ITaskState taskState) {
+ taskStateMap.put(taskState.getTaskId(), taskState);
+ }
+
+ @Override
+ public ITaskState getTaskState(TaskId taskId) {
+ return taskStateMap.get(taskId);
+ }
+ }
+
+ public NodeControllerService getNodeController() {
+ return nodeController;
+ }
+
+ public void dumpProfile(JobletProfile jProfile) {
+ Map<String, Long> counters = jProfile.getCounters();
+ for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
+ counters.put(e.getKey(), e.getValue().get());
+ }
+ for (Task task : taskMap.values()) {
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
+ new Hashtable<PartitionId, PartitionProfile>(task.getPartitionSendProfile()));
+ task.dumpProfile(taskProfile);
+ jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
+ }
+ }
+
+ @Override
+ public INCApplicationContext getApplicationContext() {
+ return appCtx;
+ }
+
+ @Override
+ public ICounterContext getCounterContext() {
+ return this;
+ }
+
+ @Override
+ public void registerDeallocatable(IDeallocatable deallocatable) {
+ deallocatableRegistry.registerDeallocatable(deallocatable);
+ }
+
+ public void close() {
+ nodeController.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ deallocatableRegistry.close();
+ }
+ });
+ }
+
+ @Override
+ public ByteBuffer allocateFrame() {
+ return appCtx.getRootContext().allocateFrame();
+ }
+
+ @Override
+ public int getFrameSize() {
+ return appCtx.getRootContext().getFrameSize();
+ }
+
+ @Override
+ public IIOManager getIOManager() {
+ return appCtx.getRootContext().getIOManager();
+ }
+
+ @Override
+ public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return fileFactory.createManagedWorkspaceFile(prefix);
+ }
+
+ @Override
+ public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return fileFactory.createUnmanagedWorkspaceFile(prefix);
+ }
+
+ @Override
+ public synchronized ICounter getCounter(String name, boolean create) {
+ Counter counter = counterMap.get(name);
+ if (counter == null && create) {
+ counter = new Counter(name);
+ counterMap.put(name, counter);
+ }
+ return counter;
+ }
+
+ public synchronized void advertisePartitionRequest(TaskAttemptId taId, Collection<PartitionId> pids,
+ IPartitionCollector collector, PartitionState minState) throws Exception {
+ for (PartitionId pid : pids) {
+ partitionRequestMap.put(pid, collector);
+ PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState);
+ nodeController.getClusterController().registerPartitionRequest(req);
+ }
+ }
+
+ public synchronized void reportPartitionAvailability(PartitionChannel channel) throws HyracksException {
+ IPartitionCollector collector = partitionRequestMap.get(channel.getPartitionId());
+ if (collector != null) {
+ collector.addPartitions(Collections.singleton(channel));
+ }
+ }
+
+ public IJobletEventListener getJobletEventListener() {
+ return jobletEventListener;
+ }
+
+ public void setJobletEventListener(IJobletEventListener jobletEventListener) {
+ this.jobletEventListener = jobletEventListener;
+ }
+
+ public void cleanup(JobStatus status) {
+ cleanupStatus = status;
+ cleanupPending = true;
+ if (taskMap.isEmpty()) {
+ performCleanup();
+ }
+ }
+
+ private void performCleanup() {
+ nodeController.getJobletMap().remove(jobId);
+ IJobletEventListener listener = getJobletEventListener();
+ if (listener != null) {
+ listener.jobletFinish(cleanupStatus);
+ }
+ close();
+ cleanupPending = false;
+ try {
+ nodeController.getClusterController().notifyJobletCleanup(jobId, nodeController.getId());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
new file mode 100644
index 0000000..dde7abc
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.dcache.client.DCacheClientConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+
+public class NCDriver {
+ public static void main(String args[]) throws Exception {
+ NCConfig ncConfig = new NCConfig();
+ CmdLineParser cp = new CmdLineParser(ncConfig);
+ try {
+ cp.parseArgument(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ cp.printUsage(System.err);
+ return;
+ }
+
+ DCacheClientConfig dccConfig = new DCacheClientConfig();
+ dccConfig.servers = ncConfig.dcacheClientServers;
+ dccConfig.serverLocal = ncConfig.dcacheClientServerLocal;
+ dccConfig.path = ncConfig.dcacheClientPath;
+
+ DCacheClient.get().init(dccConfig);
+
+ final NodeControllerService nService = new NodeControllerService(ncConfig);
+ nService.start();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ nService.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ while (true) {
+ Thread.sleep(10000);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
new file mode 100644
index 0000000..f3d00ac
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -0,0 +1,417 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.io.File;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.WorkQueue;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
+import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
+import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
+import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
+import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class NodeControllerService extends AbstractRemoteService {
+ private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+
+ private NCConfig ncConfig;
+
+ private final String id;
+
+ private final IHyracksRootContext ctx;
+
+ private final IPCSystem ipc;
+
+ private final PartitionManager partitionManager;
+
+ private final NetworkManager netManager;
+
+ private final WorkQueue queue;
+
+ private final Timer timer;
+
+ private boolean registrationPending;
+
+ private Exception registrationException;
+
+ private IClusterController ccs;
+
+ private final Map<JobId, Joblet> jobletMap;
+
+ private final Executor executor;
+
+ private NodeParameters nodeParameters;
+
+ private HeartbeatTask heartbeatTask;
+
+ private final ServerContext serverCtx;
+
+ private final Map<String, NCApplicationContext> applications;
+
+ private final MemoryMXBean memoryMXBean;
+
+ private final List<GarbageCollectorMXBean> gcMXBeans;
+
+ private final ThreadMXBean threadMXBean;
+
+ private final RuntimeMXBean runtimeMXBean;
+
+ private final OperatingSystemMXBean osMXBean;
+
+ public NodeControllerService(NCConfig ncConfig) throws Exception {
+ this.ncConfig = ncConfig;
+ id = ncConfig.nodeId;
+ executor = Executors.newCachedThreadPool();
+ NodeControllerIPCI ipci = new NodeControllerIPCI();
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
+ new CCNCFunctions.SerializerDeserializer());
+ this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
+ if (id == null) {
+ throw new Exception("id not set");
+ }
+ partitionManager = new PartitionManager(this);
+ netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
+
+ queue = new WorkQueue();
+ jobletMap = new Hashtable<JobId, Joblet>();
+ timer = new Timer(true);
+ serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
+ NodeControllerService.class.getName()), id));
+ applications = new Hashtable<String, NCApplicationContext>();
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+ gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ threadMXBean = ManagementFactory.getThreadMXBean();
+ runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+ osMXBean = ManagementFactory.getOperatingSystemMXBean();
+ registrationPending = true;
+ }
+
+ public IHyracksRootContext getRootContext() {
+ return ctx;
+ }
+
+ private static List<IODeviceHandle> getDevices(String ioDevices) {
+ List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+ StringTokenizer tok = new StringTokenizer(ioDevices, ",");
+ while (tok.hasMoreElements()) {
+ String devPath = tok.nextToken().trim();
+ devices.add(new IODeviceHandle(new File(devPath), "."));
+ }
+ return devices;
+ }
+
+ private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+ this.nodeParameters = parameters;
+ this.registrationException = exception;
+ this.registrationPending = false;
+ notifyAll();
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.log(Level.INFO, "Starting NodeControllerService");
+ ipc.start();
+ netManager.start();
+ IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+ this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
+ HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
+ for (int i = 0; i < gcInfos.length; ++i) {
+ gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
+ }
+ HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
+ .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+
+ synchronized (this) {
+ while (registrationPending) {
+ wait();
+ }
+ }
+ if (registrationException != null) {
+ throw registrationException;
+ }
+
+ queue.start();
+
+ heartbeatTask = new HeartbeatTask(ccs);
+
+ // Schedule heartbeat generator.
+ timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
+
+ if (nodeParameters.getProfileDumpPeriod() > 0) {
+ // Schedule profile dump generator.
+ timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+ }
+
+ LOGGER.log(Level.INFO, "Started NodeControllerService");
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+ partitionManager.close();
+ heartbeatTask.cancel();
+ netManager.stop();
+ queue.stop();
+ LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public ServerContext getServerContext() {
+ return serverCtx;
+ }
+
+ public Map<String, NCApplicationContext> getApplications() {
+ return applications;
+ }
+
+ public Map<JobId, Joblet> getJobletMap() {
+ return jobletMap;
+ }
+
+ public NetworkManager getNetworkManager() {
+ return netManager;
+ }
+
+ public PartitionManager getPartitionManager() {
+ return partitionManager;
+ }
+
+ public IClusterController getClusterController() {
+ return ccs;
+ }
+
+ public NodeParameters getNodeParameters() {
+ return nodeParameters;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public NCConfig getConfiguration() throws Exception {
+ return ncConfig;
+ }
+
+ public WorkQueue getWorkQueue() {
+ return queue;
+ }
+
+ private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+ String ipaddrStr = ncConfig.dataIPAddress;
+ ipaddrStr = ipaddrStr.trim();
+ Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
+ Matcher m = pattern.matcher(ipaddrStr);
+ if (!m.matches()) {
+ throw new Exception(MessageFormat.format(
+ "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+ }
+ byte[] ipBytes = new byte[4];
+ ipBytes[0] = (byte) Integer.parseInt(m.group(1));
+ ipBytes[1] = (byte) Integer.parseInt(m.group(2));
+ ipBytes[2] = (byte) Integer.parseInt(m.group(3));
+ ipBytes[3] = (byte) Integer.parseInt(m.group(4));
+ return InetAddress.getByAddress(ipBytes);
+ }
+
+ private class HeartbeatTask extends TimerTask {
+ private IClusterController cc;
+
+ private final HeartbeatData hbData;
+
+ public HeartbeatTask(IClusterController cc) {
+ this.cc = cc;
+ hbData = new HeartbeatData();
+ hbData.gcCollectionCounts = new long[gcMXBeans.size()];
+ hbData.gcCollectionTimes = new long[gcMXBeans.size()];
+ }
+
+ @Override
+ public void run() {
+ MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
+ hbData.heapInitSize = heapUsage.getInit();
+ hbData.heapUsedSize = heapUsage.getUsed();
+ hbData.heapCommittedSize = heapUsage.getCommitted();
+ hbData.heapMaxSize = heapUsage.getMax();
+ MemoryUsage nonheapUsage = memoryMXBean.getNonHeapMemoryUsage();
+ hbData.nonheapInitSize = nonheapUsage.getInit();
+ hbData.nonheapUsedSize = nonheapUsage.getUsed();
+ hbData.nonheapCommittedSize = nonheapUsage.getCommitted();
+ hbData.nonheapMaxSize = nonheapUsage.getMax();
+ hbData.threadCount = threadMXBean.getThreadCount();
+ hbData.peakThreadCount = threadMXBean.getPeakThreadCount();
+ hbData.totalStartedThreadCount = threadMXBean.getTotalStartedThreadCount();
+ hbData.systemLoadAverage = osMXBean.getSystemLoadAverage();
+ int gcN = gcMXBeans.size();
+ for (int i = 0; i < gcN; ++i) {
+ GarbageCollectorMXBean gcMXBean = gcMXBeans.get(i);
+ hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
+ hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
+ }
+
+ MuxDemuxPerformanceCounters netPC = netManager.getPerformanceCounters();
+ hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
+ hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
+ hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
+ hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
+
+ IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
+ hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
+ hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
+ hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
+ hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
+
+ try {
+ cc.nodeHeartbeat(id, hbData);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class ProfileDumpTask extends TimerTask {
+ private IClusterController cc;
+
+ public ProfileDumpTask(IClusterController cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public void run() {
+ try {
+ FutureValue<List<JobProfile>> fv = new FutureValue<List<JobProfile>>();
+ BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
+ queue.scheduleAndSync(bjpw);
+ List<JobProfile> profiles = fv.get();
+ if (!profiles.isEmpty()) {
+ cc.reportProfile(id, profiles);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private final class NodeControllerIPCI implements IIPCI {
+ @Override
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
+ switch (fn.getFunctionId()) {
+ case START_TASKS: {
+ CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
+ queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
+ .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies()));
+ return;
+ }
+
+ case ABORT_TASKS: {
+ CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
+ queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+ return;
+ }
+
+ case CLEANUP_JOBLET: {
+ CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
+ queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+ return;
+ }
+
+ case CREATE_APPLICATION: {
+ CCNCFunctions.CreateApplicationFunction caf = (CCNCFunctions.CreateApplicationFunction) fn;
+ queue.schedule(new CreateApplicationWork(NodeControllerService.this, caf.getAppName(), caf
+ .isDeployHar(), caf.getSerializedDistributedState()));
+ return;
+ }
+
+ case DESTROY_APPLICATION: {
+ CCNCFunctions.DestroyApplicationFunction daf = (CCNCFunctions.DestroyApplicationFunction) fn;
+ queue.schedule(new DestroyApplicationWork(NodeControllerService.this, daf.getAppName()));
+ return;
+ }
+
+ case REPORT_PARTITION_AVAILABILITY: {
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+ queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
+ .getPartitionId(), rpaf.getNetworkAddress()));
+ return;
+ }
+
+ case NODE_REGISTRATION_RESULT: {
+ CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
+ setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
new file mode 100644
index 0000000..1bb0b2f
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.state.ITaskState;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskCompleteWork;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskFailureWork;
+
+public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
+ private final Joblet joblet;
+
+ private final TaskAttemptId taskAttemptId;
+
+ private final String displayName;
+
+ private final Executor executor;
+
+ private final IWorkspaceFileFactory fileFactory;
+
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final Map<String, Counter> counterMap;
+
+ private final IOperatorEnvironment opEnv;
+
+ private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+ private final Set<Thread> pendingThreads;
+
+ private IPartitionCollector[] collectors;
+
+ private IOperatorNodePushable operator;
+
+ private volatile boolean failed;
+
+ private ByteArrayOutputStream errorBaos;
+
+ private PrintWriter errorWriter;
+
+ private volatile boolean aborted;
+
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
+ this.joblet = joblet;
+ this.taskAttemptId = taskId;
+ this.displayName = displayName;
+ this.executor = executor;
+ fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ counterMap = new HashMap<String, Counter>();
+ opEnv = joblet.getEnvironment();
+ partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
+ pendingThreads = new LinkedHashSet<Thread>();
+ failed = false;
+ errorBaos = new ByteArrayOutputStream();
+ errorWriter = new PrintWriter(errorBaos, true);
+ }
+
+ public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
+ this.collectors = collectors;
+ this.operator = operator;
+ }
+
+ @Override
+ public ByteBuffer allocateFrame() {
+ return joblet.allocateFrame();
+ }
+
+ @Override
+ public int getFrameSize() {
+ return joblet.getFrameSize();
+ }
+
+ @Override
+ public IIOManager getIOManager() {
+ return joblet.getIOManager();
+ }
+
+ @Override
+ public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return fileFactory.createUnmanagedWorkspaceFile(prefix);
+ }
+
+ @Override
+ public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return fileFactory.createManagedWorkspaceFile(prefix);
+ }
+
+ @Override
+ public void registerDeallocatable(IDeallocatable deallocatable) {
+ deallocatableRegistry.registerDeallocatable(deallocatable);
+ }
+
+ public void close() {
+ deallocatableRegistry.close();
+ }
+
+ @Override
+ public IHyracksJobletContext getJobletContext() {
+ return joblet;
+ }
+
+ @Override
+ public TaskAttemptId getTaskAttemptId() {
+ return taskAttemptId;
+ }
+
+ @Override
+ public ICounter getCounter(String name, boolean create) {
+ Counter counter = counterMap.get(name);
+ if (counter == null && create) {
+ counter = new Counter(name);
+ counterMap.put(name, counter);
+ }
+ return counter;
+ }
+
+ @Override
+ public ICounterContext getCounterContext() {
+ return this;
+ }
+
+ public Joblet getJoblet() {
+ return joblet;
+ }
+
+ public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
+ return partitionSendProfile;
+ }
+
+ public synchronized void dumpProfile(TaskProfile tProfile) {
+ Map<String, Long> dumpMap = tProfile.getCounters();
+ for (Counter c : counterMap.values()) {
+ dumpMap.put(c.getName(), c.get());
+ }
+ }
+
+ public void setPartitionSendProfile(PartitionProfile profile) {
+ partitionSendProfile.put(profile.getPartitionId(), profile);
+ }
+
+ public void start() throws HyracksException {
+ aborted = false;
+ executor.execute(this);
+ }
+
+ public synchronized void abort() {
+ aborted = true;
+ for (IPartitionCollector c : collectors) {
+ c.abort();
+ }
+ for (Thread t : pendingThreads) {
+ t.interrupt();
+ }
+ }
+
+ private synchronized void addPendingThread(Thread t) {
+ pendingThreads.add(t);
+ }
+
+ private synchronized void removePendingThread(Thread t) {
+ pendingThreads.remove(t);
+ if (pendingThreads.isEmpty()) {
+ notifyAll();
+ }
+ }
+
+ public synchronized void waitForCompletion() throws InterruptedException {
+ while (!pendingThreads.isEmpty()) {
+ wait();
+ }
+ }
+
+ @Override
+ public void run() {
+ Thread ct = Thread.currentThread();
+ String threadName = ct.getName();
+ addPendingThread(ct);
+ try {
+ ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
+ operator.initialize();
+ try {
+ if (collectors.length > 0) {
+ final Semaphore sem = new Semaphore(collectors.length - 1);
+ for (int i = 1; i < collectors.length; ++i) {
+ final IPartitionCollector collector = collectors[i];
+ final IFrameWriter writer = operator.getInputFrameWriter(i);
+ sem.acquire();
+ final int cIdx = i;
+ executor.execute(new Runnable() {
+ public void run() {
+ if (aborted) {
+ return;
+ }
+ Thread thread = Thread.currentThread();
+ addPendingThread(thread);
+ String oldName = thread.getName();
+ thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+ try {
+ pushFrames(collector, writer);
+ } catch (HyracksDataException e) {
+ synchronized (Task.this) {
+ failed = true;
+ errorWriter.println("Exception caught by thread: " + thread.getName());
+ e.printStackTrace(errorWriter);
+ errorWriter.println();
+ }
+ } finally {
+ thread.setName(oldName);
+ sem.release();
+ removePendingThread(thread);
+ }
+ }
+ });
+ }
+ try {
+ pushFrames(collectors[0], operator.getInputFrameWriter(0));
+ } finally {
+ sem.acquire(collectors.length - 1);
+ }
+ }
+ } finally {
+ operator.deinitialize();
+ }
+ NodeControllerService ncs = joblet.getNodeController();
+ ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
+ } catch (Exception e) {
+ failed = true;
+ errorWriter.println("Exception caught by thread: " + ct.getName());
+ e.printStackTrace(errorWriter);
+ errorWriter.println();
+ } finally {
+ ct.setName(threadName);
+ close();
+ removePendingThread(ct);
+ }
+ if (failed) {
+ errorWriter.close();
+ NodeControllerService ncs = joblet.getNodeController();
+ try {
+ ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, errorBaos.toString("UTF-8")));
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException {
+ if (aborted) {
+ return;
+ }
+ try {
+ collector.open();
+ try {
+ joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
+ PartitionState.STARTED);
+ IFrameReader reader = collector.getReader();
+ reader.open();
+ try {
+ writer.open();
+ ByteBuffer buffer = allocateFrame();
+ while (reader.nextFrame(buffer)) {
+ if (aborted) {
+ return;
+ }
+ buffer.flip();
+ writer.nextFrame(buffer);
+ buffer.compact();
+ }
+ writer.close();
+ } catch (Exception e) {
+ writer.fail();
+ throw e;
+ } finally {
+ reader.close();
+ }
+ } finally {
+ collector.close();
+ }
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void setTaskState(ITaskState taskState) {
+ opEnv.setTaskState(taskState);
+ }
+
+ @Override
+ public ITaskState getTaskState(TaskId taskId) {
+ return opEnv.getTaskState(taskId);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
new file mode 100644
index 0000000..1121c6c
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -0,0 +1,60 @@
+package edu.uci.ics.hyracks.control.nc.application;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+
+public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+ private final String nodeId;
+ private final IHyracksRootContext rootCtx;
+ private Object appObject;
+
+ public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String appName, String nodeId)
+ throws IOException {
+ super(serverCtx, appName);
+ this.nodeId = nodeId;
+ this.rootCtx = rootCtx;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setDistributedState(Serializable state) {
+ distributedState = state;
+ }
+
+ @Override
+ protected void start() throws Exception {
+ ((INCBootstrap) bootstrap).setApplicationContext(this);
+ bootstrap.start();
+ }
+
+ @Override
+ protected void stop() throws Exception {
+ if (bootstrap != null) {
+ bootstrap.stop();
+ }
+ }
+
+ @Override
+ public IHyracksRootContext getRootContext() {
+ return rootCtx;
+ }
+
+ @Override
+ public void setApplicationObject(Object object) {
+ this.appObject = object;
+ }
+
+ @Override
+ public Object getApplicationObject() {
+ return appObject;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
new file mode 100644
index 0000000..c3105ab
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileHandle;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOFuture;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+
+public class IOManager implements IIOManager {
+ private final List<IODeviceHandle> ioDevices;
+
+ private final Executor executor;
+
+ private final List<IODeviceHandle> workAreaIODevices;
+
+ private int workAreaDeviceIndex;
+
+ public IOManager(List<IODeviceHandle> devices, Executor executor) throws HyracksException {
+ this.ioDevices = Collections.unmodifiableList(devices);
+ this.executor = executor;
+ workAreaIODevices = new ArrayList<IODeviceHandle>();
+ for (IODeviceHandle d : ioDevices) {
+ if (d.getWorkAreaPath() != null) {
+ new File(d.getPath(), d.getWorkAreaPath()).mkdirs();
+ workAreaIODevices.add(d);
+ }
+ }
+ if (workAreaIODevices.isEmpty()) {
+ throw new HyracksException("No devices with work areas found");
+ }
+ workAreaDeviceIndex = 0;
+ }
+
+ @Override
+ public List<IODeviceHandle> getIODevices() {
+ return ioDevices;
+ }
+
+ @Override
+ public FileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+ throws HyracksDataException {
+ FileHandle fHandle = new FileHandle(fileRef);
+ try {
+ fHandle.open(rwMode, syncMode);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return fHandle;
+ }
+
+ @Override
+ public int syncWrite(FileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ try {
+ int n = 0;
+ int remaining = data.remaining();
+ while (remaining > 0) {
+ int len = fHandle.getFileChannel().write(data, offset);
+ if (len < 0) {
+ throw new HyracksDataException("Error writing to file: " + fHandle.getFileReference().toString());
+ }
+ remaining -= len;
+ offset += len;
+ n += len;
+ }
+ return n;
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public int syncRead(FileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ try {
+ int n = 0;
+ int remaining = data.remaining();
+ while (remaining > 0) {
+ int len = fHandle.getFileChannel().read(data, offset);
+ if (len < 0) {
+ return -1;
+ }
+ remaining -= len;
+ offset += len;
+ n += len;
+ }
+ return n;
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public IIOFuture asyncWrite(FileHandle fHandle, long offset, ByteBuffer data) {
+ AsyncWriteRequest req = new AsyncWriteRequest(fHandle, offset, data);
+ executor.execute(req);
+ return req;
+ }
+
+ @Override
+ public IIOFuture asyncRead(FileHandle fHandle, long offset, ByteBuffer data) {
+ AsyncReadRequest req = new AsyncReadRequest(fHandle, offset, data);
+ executor.execute(req);
+ return req;
+ }
+
+ @Override
+ public void close(FileHandle fHandle) throws HyracksDataException {
+ try {
+ fHandle.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public synchronized FileReference createWorkspaceFile(String prefix) throws HyracksDataException {
+ IODeviceHandle dev = workAreaIODevices.get(workAreaDeviceIndex);
+ workAreaDeviceIndex = (workAreaDeviceIndex + 1) % workAreaIODevices.size();
+ String waPath = dev.getWorkAreaPath();
+ File waf;
+ try {
+ waf = File.createTempFile(prefix, ".waf", new File(dev.getPath(), waPath));
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return dev.createFileReference(waPath + File.separator + waf.getName());
+ }
+
+ private abstract class AsyncRequest implements IIOFuture, Runnable {
+ protected final FileHandle fHandle;
+ protected final long offset;
+ protected final ByteBuffer data;
+ private boolean complete;
+ private HyracksDataException exception;
+ private int result;
+
+ private AsyncRequest(FileHandle fHandle, long offset, ByteBuffer data) {
+ this.fHandle = fHandle;
+ this.offset = offset;
+ this.data = data;
+ complete = false;
+ exception = null;
+ }
+
+ @Override
+ public void run() {
+ HyracksDataException hde = null;
+ int res = -1;
+ try {
+ res = performOperation();
+ } catch (HyracksDataException e) {
+ hde = e;
+ }
+ synchronized (this) {
+ exception = hde;
+ result = res;
+ complete = true;
+ notifyAll();
+ }
+ }
+
+ protected abstract int performOperation() throws HyracksDataException;
+
+ @Override
+ public synchronized int synchronize() throws HyracksDataException, InterruptedException {
+ while (!complete) {
+ wait();
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ return result;
+ }
+
+ @Override
+ public synchronized boolean isComplete() {
+ return complete;
+ }
+ }
+
+ private class AsyncReadRequest extends AsyncRequest {
+ private AsyncReadRequest(FileHandle fHandle, long offset, ByteBuffer data) {
+ super(fHandle, offset, data);
+ }
+
+ @Override
+ protected int performOperation() throws HyracksDataException {
+ return syncRead(fHandle, offset, data);
+ }
+ }
+
+ private class AsyncWriteRequest extends AsyncRequest {
+ private AsyncWriteRequest(FileHandle fHandle, long offset, ByteBuffer data) {
+ super(fHandle, offset, data);
+ }
+
+ @Override
+ protected int performOperation() throws HyracksDataException {
+ return syncWrite(fHandle, offset, data);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/WorkspaceFileFactory.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/WorkspaceFileFactory.java
new file mode 100644
index 0000000..16421ab
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/WorkspaceFileFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.io;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public final class WorkspaceFileFactory implements IWorkspaceFileFactory {
+ private final IDeallocatableRegistry registry;
+ private final IOManager ioManager;
+
+ public WorkspaceFileFactory(IDeallocatableRegistry registry, IOManager ioManager) {
+ this.registry = registry;
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+ final FileReference fRef = ioManager.createWorkspaceFile(prefix);
+ registry.registerDeallocatable(new IDeallocatable() {
+ @Override
+ public void deallocate() {
+ fRef.delete();
+ }
+ });
+ return fRef;
+ }
+
+ @Override
+ public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return ioManager.createWorkspaceFile(prefix);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
new file mode 100644
index 0000000..609d7f0
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkInputChannel implements IInputChannel {
+ private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+
+ private IHyracksRootContext ctx;
+
+ private final NetworkManager netManager;
+
+ private final SocketAddress remoteAddress;
+
+ private final PartitionId partitionId;
+
+ private final Queue<ByteBuffer> fullQueue;
+
+ private final int nBuffers;
+
+ private ChannelControlBlock ccb;
+
+ private IInputChannelMonitor monitor;
+
+ private Object attachment;
+
+ public NetworkInputChannel(IHyracksRootContext ctx, NetworkManager netManager, SocketAddress remoteAddress,
+ PartitionId partitionId, int nBuffers) {
+ this.ctx = ctx;
+ this.netManager = netManager;
+ this.remoteAddress = remoteAddress;
+ this.partitionId = partitionId;
+ fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ this.nBuffers = nBuffers;
+ }
+
+ @Override
+ public void registerMonitor(IInputChannelMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public synchronized ByteBuffer getNextBuffer() {
+ return fullQueue.poll();
+ }
+
+ @Override
+ public void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
+ ccb = netManager.connect(remoteAddress);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ for (int i = 0; i < nBuffers; ++i) {
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+ }
+ ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+ writeBuffer.putLong(partitionId.getJobId().getId());
+ writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+ writeBuffer.putInt(partitionId.getSenderIndex());
+ writeBuffer.putInt(partitionId.getReceiverIndex());
+ writeBuffer.flip();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Sending partition request: " + partitionId + " on channel: " + ccb);
+ }
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ fullQueue.add(buffer);
+ monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
+ }
+
+ @Override
+ public void close() {
+ monitor.notifyEndOfStream(NetworkInputChannel.this);
+ }
+
+ @Override
+ public void error(int ecode) {
+ monitor.notifyFailure(NetworkInputChannel.this);
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ // do nothing
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
new file mode 100644
index 0000000..68e3120
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class NetworkManager {
+ private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
+
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final IHyracksRootContext ctx;
+
+ private final IPartitionRequestListener partitionRequestListener;
+
+ private final MuxDemux md;
+
+ private NetworkAddress networkAddress;
+
+ public NetworkManager(IHyracksRootContext ctx, InetAddress inetAddress,
+ IPartitionRequestListener partitionRequestListener, int nThreads) throws IOException {
+ this.ctx = ctx;
+ this.partitionRequestListener = partitionRequestListener;
+ md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads);
+ }
+
+ public void start() throws IOException {
+ md.start();
+ InetSocketAddress sockAddr = md.getLocalAddress();
+ networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public void stop() {
+
+ }
+
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+ MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+ return mConn.openChannel();
+ }
+
+ private class ChannelOpenListener implements IChannelOpenListener {
+ @Override
+ public void channelOpened(ChannelControlBlock channel) {
+ channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+ channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+ }
+ }
+
+ private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+ private final ChannelControlBlock ccb;
+
+ private NetworkOutputChannel noc;
+
+ public InitialBufferAcceptor(ChannelControlBlock ccb) {
+ this.ccb = ccb;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ PartitionId pid = readInitialMessage(buffer);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Received initial partition request: " + pid + " on channel: " + ccb);
+ }
+ noc = new NetworkOutputChannel(ctx, ccb, 5);
+ try {
+ partitionRequestListener.registerPartitionRequest(pid, noc);
+ } catch (HyracksException e) {
+ noc.abort();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void error(int ecode) {
+ if (noc != null) {
+ noc.abort();
+ }
+ }
+ }
+
+ private static PartitionId readInitialMessage(ByteBuffer buffer) {
+ JobId jobId = new JobId(buffer.getLong());
+ ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
+ int senderIndex = buffer.getInt();
+ int receiverIndex = buffer.getInt();
+ return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
+ }
+
+ public MuxDemuxPerformanceCounters getPerformanceCounters() {
+ return md.getPerformanceCounters();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
new file mode 100644
index 0000000..185768e
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkOutputChannel implements IFrameWriter {
+ private final ChannelControlBlock ccb;
+
+ private final Deque<ByteBuffer> emptyStack;
+
+ private boolean aborted;
+
+ public NetworkOutputChannel(IHyracksRootContext ctx, ChannelControlBlock ccb, int nBuffers) {
+ this.ccb = ccb;
+ emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+ for (int i = 0; i < nBuffers; ++i) {
+ emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
+ }
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer destBuffer = null;
+ synchronized (this) {
+ while (true) {
+ if (aborted) {
+ throw new HyracksDataException("Connection has been aborted");
+ }
+ destBuffer = emptyStack.poll();
+ if (destBuffer != null) {
+ break;
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ buffer.position(0);
+ buffer.limit(destBuffer.capacity());
+ destBuffer.clear();
+ destBuffer.put(buffer);
+ destBuffer.flip();
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ void abort() {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ synchronized (NetworkOutputChannel.this) {
+ aborted = true;
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ synchronized (NetworkOutputChannel.this) {
+ emptyStack.push(buffer);
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/IPartitionRequestListener.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/IPartitionRequestListener.java
new file mode 100644
index 0000000..20ed49c
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/IPartitionRequestListener.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public interface IPartitionRequestListener {
+ public void registerPartitionRequest(PartitionId partitionId, IFrameWriter writer) throws HyracksException;
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
new file mode 100644
index 0000000..97f82d9
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileHandle;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializedPartition implements IPartition {
+ private final IHyracksRootContext ctx;
+
+ private final FileReference partitionFile;
+
+ private final Executor executor;
+
+ private final IOManager ioManager;
+
+ public MaterializedPartition(IHyracksRootContext ctx, FileReference partitionFile, Executor executor,
+ IOManager ioManager) {
+ this.ctx = ctx;
+ this.partitionFile = partitionFile;
+ this.executor = executor;
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public void deallocate() {
+ partitionFile.delete();
+ }
+
+ @Override
+ public void writeTo(final IFrameWriter writer) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ FileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try {
+ writer.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ while (true) {
+ buffer.clear();
+ long size = ioManager.syncRead(fh, offset, buffer);
+ if (size < 0) {
+ break;
+ } else if (size < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += size;
+ buffer.flip();
+ writer.nextFrame(buffer);
+ }
+ } finally {
+ writer.close();
+ }
+ } finally {
+ ioManager.close(fh);
+ }
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public boolean isReusable() {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
new file mode 100644
index 0000000..0ea3124
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class MaterializedPartitionInputChannel implements IInputChannel {
+ private final Queue<ByteBuffer> emptyQueue;
+
+ private final Queue<ByteBuffer> fullQueue;
+
+ private final PartitionId pid;
+
+ private final PartitionManager manager;
+
+ private final FrameWriter writer;
+
+ private IInputChannelMonitor monitor;
+
+ private Object attachment;
+
+ public MaterializedPartitionInputChannel(IHyracksRootContext ctx, int nBuffers, PartitionId pid,
+ PartitionManager manager) {
+ this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ for (int i = 0; i < nBuffers; ++i) {
+ emptyQueue.add(ctx.allocateFrame());
+ }
+ fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ this.pid = pid;
+ this.manager = manager;
+ writer = new FrameWriter();
+ }
+
+ @Override
+ public void registerMonitor(IInputChannelMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public ByteBuffer getNextBuffer() {
+ return fullQueue.poll();
+ }
+
+ @Override
+ public void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ synchronized (this) {
+ emptyQueue.add(buffer);
+ notifyAll();
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ IPartition partition = manager.getPartition(pid);
+ partition.writeTo(writer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ private class FrameWriter implements IFrameWriter {
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ synchronized (MaterializedPartitionInputChannel.this) {
+ while (emptyQueue.isEmpty()) {
+ try {
+ MaterializedPartitionInputChannel.this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ByteBuffer destFrame = emptyQueue.poll();
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ destFrame.clear();
+ destFrame.put(buffer);
+ fullQueue.add(destFrame);
+ monitor.notifyDataAvailability(MaterializedPartitionInputChannel.this, 1);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ monitor.notifyEndOfStream(MaterializedPartitionInputChannel.this);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
new file mode 100644
index 0000000..1a1b15f
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileHandle;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializedPartitionWriter implements IFrameWriter {
+ private static final Logger LOGGER = Logger.getLogger(MaterializedPartitionWriter.class.getName());
+
+ private final IHyracksRootContext ctx;
+
+ private final PartitionManager manager;
+
+ private final PartitionId pid;
+
+ private final TaskAttemptId taId;
+
+ private final Executor executor;
+
+ private FileReference fRef;
+
+ private FileHandle handle;
+
+ private long size;
+
+ private boolean failed;
+
+ public MaterializedPartitionWriter(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+ TaskAttemptId taId, Executor executor) {
+ this.ctx = ctx;
+ this.manager = manager;
+ this.pid = pid;
+ this.taId = taId;
+ this.executor = executor;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("open(" + pid + " by " + taId);
+ }
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ size = 0;
+ failed = false;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ size += ctx.getIOManager().syncWrite(handle, size, buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ failed = true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("close(" + pid + " by " + taId);
+ }
+ ctx.getIOManager().close(handle);
+ if (!failed) {
+ manager.registerPartition(pid, taId,
+ new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
+ PartitionState.COMMITTED);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
new file mode 100644
index 0000000..143fd2c
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileHandle;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializingPipelinedPartition implements IFrameWriter, IPartition {
+ private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
+
+ private final IHyracksRootContext ctx;
+
+ private final Executor executor;
+
+ private final IOManager ioManager;
+
+ private final PartitionManager manager;
+
+ private final PartitionId pid;
+
+ private final TaskAttemptId taId;
+
+ private FileReference fRef;
+
+ private FileHandle handle;
+
+ private long size;
+
+ private boolean eos;
+
+ private boolean failed;
+
+ public MaterializingPipelinedPartition(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+ TaskAttemptId taId, Executor executor) {
+ this.ctx = ctx;
+ this.executor = executor;
+ this.ioManager = (IOManager) ctx.getIOManager();
+ this.manager = manager;
+ this.pid = pid;
+ this.taId = taId;
+ }
+
+ @Override
+ public void deallocate() {
+ fRef.delete();
+ }
+
+ @Override
+ public void writeTo(final IFrameWriter writer) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ FileHandle fh = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try {
+ writer.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ boolean fail = false;
+ boolean done = false;
+ while (!fail && !done) {
+ synchronized (MaterializingPipelinedPartition.this) {
+ while (offset >= size && !eos && !failed) {
+ try {
+ MaterializingPipelinedPartition.this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ fail = failed;
+ done = eos && offset >= size;
+ }
+ if (fail) {
+ writer.fail();
+ } else if (!done) {
+ buffer.clear();
+ long readLen = ioManager.syncRead(fh, offset, buffer);
+ if (readLen < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += readLen;
+ buffer.flip();
+ writer.nextFrame(buffer);
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ } finally {
+ ioManager.close(fh);
+ }
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public boolean isReusable() {
+ return true;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("open(" + pid + " by " + taId);
+ }
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ size = 0;
+ eos = false;
+ failed = false;
+ manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+ }
+
+ @Override
+ public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ size += ctx.getIOManager().syncWrite(handle, size, buffer);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void fail() throws HyracksDataException {
+ failed = true;
+ notifyAll();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("close(" + pid + " by " + taId);
+ }
+ boolean commit = false;
+ synchronized (this) {
+ eos = true;
+ ctx.getIOManager().close(handle);
+ handle = null;
+ commit = !failed;
+ notifyAll();
+ }
+ if (commit) {
+ manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
new file mode 100644
index 0000000..ce40fe6
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+
+public class PartitionManager implements IPartitionRequestListener {
+ private final NodeControllerService ncs;
+
+ private final Map<PartitionId, List<IPartition>> partitionMap;
+
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final IWorkspaceFileFactory fileFactory;
+
+ public PartitionManager(NodeControllerService ncs) {
+ this.ncs = ncs;
+ partitionMap = new HashMap<PartitionId, List<IPartition>>();
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+ }
+
+ public void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+ throws HyracksDataException {
+ synchronized (this) {
+ List<IPartition> pList = partitionMap.get(pid);
+ if (pList == null) {
+ pList = new ArrayList<IPartition>();
+ partitionMap.put(pid, pList);
+ }
+ pList.add(partition);
+ }
+ updatePartitionState(pid, taId, partition, state);
+ }
+
+ public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+ throws HyracksDataException {
+ PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
+ desc.setState(state);
+ try {
+ ncs.getClusterController().registerPartitionProvider(desc);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public synchronized IPartition getPartition(PartitionId pid) {
+ return partitionMap.get(pid).get(0);
+ }
+
+ public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
+ for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
+ Map.Entry<PartitionId, List<IPartition>> e = i.next();
+ PartitionId pid = e.getKey();
+ if (jobId.equals(pid.getJobId())) {
+ for (IPartition p : e.getValue()) {
+ unregisteredPartitions.add(p);
+ }
+ i.remove();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void registerPartitionRequest(PartitionId partitionId, IFrameWriter writer)
+ throws HyracksException {
+ List<IPartition> pList = partitionMap.get(partitionId);
+ if (pList != null && !pList.isEmpty()) {
+ IPartition partition = pList.get(0);
+ partition.writeTo(writer);
+ if (!partition.isReusable()) {
+ partitionMap.remove(partitionId);
+ }
+ } else {
+ throw new HyracksException("Request for unknown partition " + partitionId);
+ }
+ }
+
+ public IWorkspaceFileFactory getFileFactory() {
+ return fileFactory;
+ }
+
+ public void close() {
+ deallocatableRegistry.close();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
new file mode 100644
index 0000000..92dc0b2
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+
+public class PipelinedPartition implements IFrameWriter, IPartition {
+ private final PartitionManager manager;
+
+ private final PartitionId pid;
+
+ private final TaskAttemptId taId;
+
+ private IFrameWriter delegate;
+
+ private boolean pendingConnection;
+
+ private boolean failed;
+
+ public PipelinedPartition(PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
+ this.manager = manager;
+ this.pid = pid;
+ this.taId = taId;
+ }
+
+ @Override
+ public boolean isReusable() {
+ return false;
+ }
+
+ @Override
+ public void deallocate() {
+ // do nothing
+ }
+
+ @Override
+ public synchronized void writeTo(IFrameWriter writer) {
+ delegate = writer;
+ notifyAll();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+ failed = false;
+ pendingConnection = true;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ensureConnected();
+ delegate.nextFrame(buffer);
+ }
+
+ private void ensureConnected() throws HyracksDataException {
+ if (pendingConnection) {
+ synchronized (this) {
+ while (delegate == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ delegate.open();
+ }
+ pendingConnection = false;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ ensureConnected();
+ failed = true;
+ if (delegate != null) {
+ delegate.fail();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ensureConnected();
+ if (!failed) {
+ manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+ }
+ delegate.close();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
new file mode 100644
index 0000000..f93f874
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class ReceiveSideMaterializingCollector implements IPartitionCollector {
+ private final IHyracksRootContext ctx;
+
+ private PartitionManager manager;
+
+ private final IPartitionCollector delegate;
+
+ private final TaskAttemptId taId;
+
+ private final Executor executor;
+
+ public ReceiveSideMaterializingCollector(IHyracksRootContext ctx, PartitionManager manager,
+ IPartitionCollector collector, TaskAttemptId taId, Executor executor) {
+ this.ctx = ctx;
+ this.manager = manager;
+ this.delegate = collector;
+ this.taId = taId;
+ this.executor = executor;
+ }
+
+ @Override
+ public JobId getJobId() {
+ return delegate.getJobId();
+ }
+
+ @Override
+ public ConnectorDescriptorId getConnectorId() {
+ return delegate.getConnectorId();
+ }
+
+ @Override
+ public int getReceiverIndex() {
+ return delegate.getReceiverIndex();
+ }
+
+ @Override
+ public void open() throws HyracksException {
+ delegate.open();
+ }
+
+ @Override
+ public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+ for (final PartitionChannel pc : partitions) {
+ PartitionWriter writer = new PartitionWriter(pc);
+ executor.execute(writer);
+ }
+ }
+
+ private class PartitionWriter implements Runnable, IInputChannelMonitor {
+ private PartitionChannel pc;
+
+ private int nAvailableFrames;
+
+ private boolean eos;
+
+ private boolean failed;
+
+ public PartitionWriter(PartitionChannel pc) {
+ this.pc = pc;
+ nAvailableFrames = 0;
+ eos = false;
+ }
+
+ @Override
+ public synchronized void notifyFailure(IInputChannel channel) {
+ failed = true;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+ nAvailableFrames += nFrames;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyEndOfStream(IInputChannel channel) {
+ eos = true;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void run() {
+ PartitionId pid = pc.getPartitionId();
+ MaterializedPartitionWriter mpw = new MaterializedPartitionWriter(ctx, manager, pid, taId, executor);
+ IInputChannel channel = pc.getInputChannel();
+ try {
+ channel.registerMonitor(this);
+ channel.open();
+ mpw.open();
+ while (true) {
+ if (nAvailableFrames > 0) {
+ ByteBuffer buffer = channel.getNextBuffer();
+ --nAvailableFrames;
+ mpw.nextFrame(buffer);
+ channel.recycleBuffer(buffer);
+ } else if (eos) {
+ break;
+ } else if (failed) {
+ throw new HyracksDataException("Failure occurred on input");
+ } else {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ mpw.close();
+ channel.close();
+ delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
+ new MaterializedPartitionInputChannel(ctx, 5, pid, manager))));
+ } catch (HyracksException e) {
+ }
+ }
+ }
+
+ @Override
+ public IFrameReader getReader() throws HyracksException {
+ return delegate.getReader();
+ }
+
+ @Override
+ public void close() throws HyracksException {
+ delegate.close();
+ }
+
+ @Override
+ public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
+ return delegate.getRequiredPartitionIds();
+ }
+
+ @Override
+ public void abort() {
+ delegate.abort();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
new file mode 100644
index 0000000..96fcf75
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class ConnectorReceiverProfilingFrameReader implements IFrameReader {
+ private final IFrameReader reader;
+ private final ICounter openCounter;
+ private final ICounter closeCounter;
+ private final ICounter frameCounter;
+
+ public ConnectorReceiverProfilingFrameReader(IHyracksTaskContext ctx, IFrameReader reader,
+ ConnectorDescriptorId cdId, int receiverIndex) {
+ this.reader = reader;
+ this.openCounter = ctx.getCounterContext().getCounter(cdId + ".receiver." + receiverIndex + ".open", true);
+ this.closeCounter = ctx.getCounterContext().getCounter(cdId + ".receiver." + receiverIndex + ".close", true);
+ this.frameCounter = ctx.getCounterContext()
+ .getCounter(cdId + ".receiver." + receiverIndex + ".nextFrame", true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ reader.open();
+ openCounter.update(1);
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ boolean status = reader.nextFrame(buffer);
+ if (status) {
+ frameCounter.update(1);
+ }
+ return status;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ reader.close();
+ closeCounter.update(1);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
new file mode 100644
index 0000000..be7b319
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class ConnectorSenderProfilingFrameWriter implements IFrameWriter {
+ private final IFrameWriter writer;
+ private final ICounter openCounter;
+ private final ICounter closeCounter;
+ private final ICounter frameCounter;
+
+ public ConnectorSenderProfilingFrameWriter(IHyracksTaskContext ctx, IFrameWriter writer,
+ ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
+ this.writer = writer;
+ int attempt = ctx.getTaskAttemptId().getAttempt();
+ this.openCounter = ctx.getCounterContext().getCounter(
+ cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".open", true);
+ this.closeCounter = ctx.getCounterContext().getCounter(
+ cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".close", true);
+ this.frameCounter = ctx.getCounterContext().getCounter(
+ cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".nextFrame", true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ openCounter.update(1);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ frameCounter.update(1);
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ closeCounter.update(1);
+ writer.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
new file mode 100644
index 0000000..affa01c
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class ProfilingPartitionWriterFactory implements IPartitionWriterFactory {
+ private static final int N_SAMPLES = 64;
+
+ private final IHyracksTaskContext ctx;
+
+ private final IConnectorDescriptor cd;
+
+ private final int senderIndex;
+
+ private final IPartitionWriterFactory delegate;
+
+ public ProfilingPartitionWriterFactory(IHyracksTaskContext ctx, IConnectorDescriptor cd, int senderIndex,
+ IPartitionWriterFactory delegate) {
+ this.ctx = ctx;
+ this.cd = cd;
+ this.senderIndex = senderIndex;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public IFrameWriter createFrameWriter(final int receiverIndex) throws HyracksDataException {
+ final IFrameWriter writer = new ConnectorSenderProfilingFrameWriter(ctx,
+ delegate.createFrameWriter(receiverIndex), cd.getConnectorId(), senderIndex, receiverIndex);
+ return new IFrameWriter() {
+ private long openTime;
+
+ private long closeTime;
+
+ MultiResolutionEventProfiler mrep = new MultiResolutionEventProfiler(N_SAMPLES);
+
+ @Override
+ public void open() throws HyracksDataException {
+ openTime = System.currentTimeMillis();
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ mrep.reportEvent();
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ closeTime = System.currentTimeMillis();
+ ((Task) ctx).setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext()
+ .getJobId(), cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
+ writer.close();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/DefaultDeallocatableRegistry.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/DefaultDeallocatableRegistry.java
new file mode 100644
index 0000000..863ab7b
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/DefaultDeallocatableRegistry.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.resources;
+
+import java.util.List;
+import java.util.Vector;
+
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public class DefaultDeallocatableRegistry implements IDeallocatableRegistry {
+ private final List<IDeallocatable> deallocatables;
+
+ public DefaultDeallocatableRegistry() {
+ deallocatables = new Vector<IDeallocatable>();
+ }
+
+ @Override
+ public void registerDeallocatable(IDeallocatable deallocatable) {
+ deallocatables.add(deallocatable);
+ }
+
+ public void close() {
+ for (IDeallocatable d : deallocatables) {
+ try {
+ d.deallocate();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
new file mode 100644
index 0000000..045fa52
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.runtime;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public class RootHyracksContext implements IHyracksRootContext {
+ private final int frameSize;
+
+ private final IIOManager ioManager;
+
+ public RootHyracksContext(int frameSize, IIOManager ioManager) {
+ this.frameSize = frameSize;
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ @Override
+ public IIOManager getIOManager() {
+ return ioManager;
+ }
+
+ @Override
+ public ByteBuffer allocateFrame() {
+ return ByteBuffer.allocate(frameSize);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
new file mode 100644
index 0000000..e8c4052
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class AbortTasksWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(AbortTasksWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final JobId jobId;
+
+ private final List<TaskAttemptId> tasks;
+
+ public AbortTasksWork(NodeControllerService ncs, JobId jobId, List<TaskAttemptId> tasks) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
+ }
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(jobId);
+ if (ji != null) {
+ Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
+ for (TaskAttemptId taId : tasks) {
+ Task task = taskMap.get(taId);
+ if (task != null) {
+ task.abort();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
new file mode 100644
index 0000000..574bc6d
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class BuildJobProfilesWork extends SynchronizableWork {
+ private final NodeControllerService ncs;
+
+ private final FutureValue<List<JobProfile>> fv;
+
+ public BuildJobProfilesWork(NodeControllerService ncs, FutureValue<List<JobProfile>> fv) {
+ this.ncs = ncs;
+ this.fv = fv;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ List<JobProfile> profiles = new ArrayList<JobProfile>();
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ for (Joblet ji : jobletMap.values()) {
+ profiles.add(new JobProfile(ji.getJobId()));
+ }
+ for (JobProfile jProfile : profiles) {
+ Joblet ji;
+ JobletProfile jobletProfile = new JobletProfile(ncs.getId());
+ ji = jobletMap.get(jProfile.getJobId());
+ if (ji != null) {
+ ji.dumpProfile(jobletProfile);
+ jProfile.getJobletProfiles().put(ncs.getId(), jobletProfile);
+ }
+ }
+ fv.setValue(profiles);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
new file mode 100644
index 0000000..b75a1fc
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class CleanupJobletWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(CleanupJobletWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final JobId jobId;
+
+ private JobStatus status;
+
+ public CleanupJobletWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ this.status = status;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Cleaning up after job: " + jobId);
+ }
+ final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
+ ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
+ ncs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ for (IPartition p : unregisteredPartitions) {
+ p.deallocate();
+ }
+ }
+ });
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet joblet = jobletMap.remove(jobId);
+ if (joblet != null) {
+ joblet.cleanup(status);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
new file mode 100644
index 0000000..eb982df
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class CreateApplicationWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(CreateApplicationWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final String appName;
+
+ private final boolean deployHar;
+
+ private final byte[] serializedDistributedState;
+
+ public CreateApplicationWork(NodeControllerService ncs, String appName, boolean deployHar,
+ byte[] serializedDistributedState) {
+ this.ncs = ncs;
+ this.appName = appName;
+ this.deployHar = deployHar;
+ this.serializedDistributedState = serializedDistributedState;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ NCApplicationContext appCtx;
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ if (applications.containsKey(appName)) {
+ throw new HyracksException("Duplicate application with name: " + appName + " being created.");
+ }
+ appCtx = new NCApplicationContext(ncs.getServerContext(), ncs.getRootContext(), appName, ncs.getId());
+ applications.put(appName, appCtx);
+ if (deployHar) {
+ NCConfig ncConfig = ncs.getConfiguration();
+ NodeParameters nodeParameters = ncs.getNodeParameters();
+ HttpClient hc = new DefaultHttpClient();
+ HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
+ + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
+ HttpResponse response = hc.execute(get);
+ InputStream is = response.getEntity().getContent();
+ OutputStream os = appCtx.getHarOutputStream();
+ try {
+ IOUtils.copyLarge(is, os);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
+ appCtx.initializeClassPath();
+ appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
+ appCtx.initialize();
+ ncs.getClusterController()
+ .notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
+ } catch (Exception e) {
+ LOGGER.warning("Error creating application: " + e.getMessage());
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
new file mode 100644
index 0000000..cfe00f6
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class DestroyApplicationWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(DestroyApplicationWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final String appName;
+
+ public DestroyApplicationWork(NodeControllerService ncs, String appName) {
+ this.ncs = ncs;
+ this.appName = appName;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ ApplicationContext appCtx = applications.remove(appName);
+ if (appCtx != null) {
+ appCtx.deinitialize();
+ }
+ } catch (Exception e) {
+ LOGGER.warning("Error destroying application: " + e.getMessage());
+ }
+ ncs.getClusterController().notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.DEINITIALIZED);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
new file mode 100644
index 0000000..022c92f
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskCompleteWork extends AbstractWork {
+ private final NodeControllerService ncs;
+ private final Task task;
+
+ public NotifyTaskCompleteWork(NodeControllerService ncs, Task task) {
+ this.ncs = ncs;
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
+ task.dumpProfile(taskProfile);
+ try {
+ ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+ ncs.getId(), taskProfile);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ task.getJoblet().removeTask(task);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
new file mode 100644
index 0000000..3957934
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskFailureWork extends AbstractWork {
+ private final NodeControllerService ncs;
+ private final Task task;
+ private final String details;
+
+ public NotifyTaskFailureWork(NodeControllerService ncs, Task task, String details) {
+ this.ncs = ncs;
+ this.task = task;
+ this.details = details;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+ ncs.getId(), details);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ task.getJoblet().removeTask(task);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
new file mode 100644
index 0000000..9734567
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
+
+public class ReportPartitionAvailabilityWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(ReportPartitionAvailabilityWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final PartitionId pid;
+
+ private final NetworkAddress networkAddress;
+
+ public ReportPartitionAvailabilityWork(NodeControllerService ncs, PartitionId pid, NetworkAddress networkAddress) {
+ this.ncs = ncs;
+ this.pid = pid;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(pid.getJobId());
+ if (ji != null) {
+ PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
+ ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
+ .getIpAddress()), networkAddress.getPort()), pid, 5));
+ ji.reportPartitionAvailability(channel);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
new file mode 100644
index 0000000..a101612
--- /dev/null
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
+import edu.uci.ics.hyracks.control.nc.profiling.ProfilingPartitionWriterFactory;
+
+public class StartTasksWork extends SynchronizableWork {
+ private static final Logger LOGGER = Logger.getLogger(StartTasksWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final String appName;
+
+ private final JobId jobId;
+
+ private final byte[] jagBytes;
+
+ private final List<TaskAttemptDescriptor> taskDescriptors;
+
+ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap;
+
+ public StartTasksWork(NodeControllerService ncs, String appName, JobId jobId, byte[] jagBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) {
+ this.ncs = ncs;
+ this.appName = appName;
+ this.jobId = jobId;
+ this.jagBytes = jagBytes;
+ this.taskDescriptors = taskDescriptors;
+ this.connectorPoliciesMap = connectorPoliciesMap;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ NCApplicationContext appCtx = applications.get(appName);
+ final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, jagBytes == null ? null
+ : (JobActivityGraph) appCtx.deserialize(jagBytes));
+ final JobActivityGraph jag = joblet.getJobActivityGraph();
+
+ IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
+ @Override
+ public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
+ return jag.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
+ }
+
+ @Override
+ public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
+ return jag.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+ }
+ };
+
+ for (TaskAttemptDescriptor td : taskDescriptors) {
+ TaskAttemptId taId = td.getTaskAttemptId();
+ TaskId tid = taId.getTaskId();
+ IActivity han = jag.getActivityNodeMap().get(tid.getActivityId());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Initializing " + taId + " -> " + han);
+ }
+ final int partition = tid.getPartition();
+ Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor());
+ IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
+
+ List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
+
+ List<IConnectorDescriptor> inputs = jag.getActivityInputConnectorDescriptors(tid.getActivityId());
+ if (inputs != null) {
+ for (int i = 0; i < inputs.size(); ++i) {
+ IConnectorDescriptor conn = inputs.get(i);
+ IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("input: " + i + ": " + conn.getConnectorId());
+ }
+ RecordDescriptor recordDesc = jag.getJobSpecification().getConnectorRecordDescriptor(conn);
+ IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
+ recordDesc, cPolicy);
+ collectors.add(collector);
+ }
+ }
+ List<IConnectorDescriptor> outputs = jag.getActivityOutputConnectorDescriptors(tid.getActivityId());
+ if (outputs != null) {
+ for (int i = 0; i < outputs.size(); ++i) {
+ final IConnectorDescriptor conn = outputs.get(i);
+ RecordDescriptor recordDesc = jag.getJobSpecification().getConnectorRecordDescriptor(conn);
+ IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+
+ IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
+ partition, taId, jag.getJobFlags());
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("output: " + i + ": " + conn.getConnectorId());
+ }
+ IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
+ td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+ operator.setOutputFrameWriter(i, writer, recordDesc);
+ }
+ }
+
+ task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
+ joblet.addTask(task);
+
+ task.start();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, JobActivityGraph jag)
+ throws Exception {
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(jobId);
+ if (ji == null) {
+ if (jag == null) {
+ throw new NullPointerException("JobActivityGraph was null");
+ }
+ ji = new Joblet(ncs, jobId, appCtx, jag);
+ IJobletEventListenerFactory jelf = jag.getJobSpecification().getJobletEventListenerFactory();
+ if (jelf != null) {
+ IJobletEventListener listener = jelf.createListener(ji);
+ ji.setJobletEventListener(listener);
+ listener.jobletStart();
+ }
+ jobletMap.put(jobId, ji);
+ }
+ return ji;
+ }
+
+ private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
+ int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
+ throws HyracksDataException {
+ IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+ td.getInputPartitionCounts()[i], td.getPartitionCount());
+ if (cPolicy.materializeOnReceiveSide()) {
+ return new ReceiveSideMaterializingCollector(ncs.getRootContext(), ncs.getPartitionManager(), collector,
+ task.getTaskAttemptId(), ncs.getExecutor());
+ } else {
+ return collector;
+ }
+ }
+
+ private IPartitionWriterFactory createPartitionWriterFactory(IHyracksTaskContext ctx, IConnectorPolicy cPolicy,
+ final JobId jobId, final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId,
+ EnumSet<JobFlag> flags) {
+ IPartitionWriterFactory factory;
+ if (cPolicy.materializeOnSendSide()) {
+ if (cPolicy.consumerWaitsForProducerToFinish()) {
+ factory = new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+ ncs.getExecutor());
+ }
+ };
+ } else {
+ factory = new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new MaterializingPipelinedPartition(ncs.getRootContext(), ncs.getPartitionManager(),
+ new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
+ ncs.getExecutor());
+ }
+ };
+ }
+ } else {
+ factory = new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new PipelinedPartition(ncs.getPartitionManager(), new PartitionId(jobId,
+ conn.getConnectorId(), senderIndex, receiverIndex), taId);
+ }
+ };
+ }
+ if (flags.contains(JobFlag.PROFILE_RUNTIME)) {
+ factory = new ProfilingPartitionWriterFactory(ctx, conn, senderIndex, factory);
+ }
+ return factory;
+ }
+}
\ No newline at end of file