Merged fullstack_staging branch into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
new file mode 100644
index 0000000..c92ff27
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -0,0 +1,53 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-control-nc</artifactId>
+ <name>hyracks-control-nc</name>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.dcache</groupId>
+ <artifactId>dcache-client</artifactId>
+ <version>0.0.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-common</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-net</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.0.1</version>
+ </plugin>
+ </plugins>
+ </reporting>
+</project>
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
new file mode 100644
index 0000000..3855b4d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.IGlobalJobDataFactory;
+import edu.uci.ics.hyracks.api.job.IJobletEventListener;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+
+public class Joblet implements IHyracksJobletContext, ICounterContext {
+ private final NodeControllerService nodeController;
+
+ private final INCApplicationContext appCtx;
+
+ private final JobId jobId;
+
+ private final ActivityClusterGraph acg;
+
+ private final Map<PartitionId, IPartitionCollector> partitionRequestMap;
+
+ private final IOperatorEnvironment env;
+
+ private final Map<Object, IStateObject> stateObjectMap;
+
+ private final Map<TaskAttemptId, Task> taskMap;
+
+ private final Map<String, Counter> counterMap;
+
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final IWorkspaceFileFactory fileFactory;
+
+ private final Object globalJobData;
+
+ private final IJobletEventListener jobletEventListener;
+
+ private final int frameSize;
+
+ private JobStatus cleanupStatus;
+
+ private boolean cleanupPending;
+
+ public Joblet(NodeControllerService nodeController, JobId jobId, INCApplicationContext appCtx,
+ ActivityClusterGraph acg) {
+ this.nodeController = nodeController;
+ this.appCtx = appCtx;
+ this.jobId = jobId;
+ this.frameSize = acg.getFrameSize();
+ this.acg = acg;
+ partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
+ env = new OperatorEnvironmentImpl(nodeController.getId());
+ stateObjectMap = new HashMap<Object, IStateObject>();
+ taskMap = new HashMap<TaskAttemptId, Task>();
+ counterMap = new HashMap<String, Counter>();
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
+ cleanupPending = false;
+ IJobletEventListenerFactory jelf = acg.getJobletEventListenerFactory();
+ if (jelf != null) {
+ IJobletEventListener listener = jelf.createListener(this);
+ this.jobletEventListener = listener;
+ listener.jobletStart();
+ } else {
+ jobletEventListener = null;
+ }
+ IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory();
+ globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
+ }
+
+ @Override
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ActivityClusterGraph getActivityClusterGraph() {
+ return acg;
+ }
+
+ public IOperatorEnvironment getEnvironment() {
+ return env;
+ }
+
+ public void addTask(Task task) {
+ taskMap.put(task.getTaskAttemptId(), task);
+ }
+
+ public void removeTask(Task task) {
+ taskMap.remove(task.getTaskAttemptId());
+ if (cleanupPending && taskMap.isEmpty()) {
+ performCleanup();
+ }
+ }
+
+ public Map<TaskAttemptId, Task> getTaskMap() {
+ return taskMap;
+ }
+
+ private final class OperatorEnvironmentImpl implements IOperatorEnvironment {
+ private final String nodeId;
+
+ public OperatorEnvironmentImpl(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String toString() {
+ return super.toString() + "@" + nodeId;
+ }
+
+ @Override
+ public synchronized void setStateObject(IStateObject taskState) {
+ stateObjectMap.put(taskState.getId(), taskState);
+ }
+
+ @Override
+ public synchronized IStateObject getStateObject(Object id) {
+ return stateObjectMap.get(id);
+ }
+ }
+
+ public NodeControllerService getNodeController() {
+ return nodeController;
+ }
+
+ public void dumpProfile(JobletProfile jProfile) {
+ Map<String, Long> counters = jProfile.getCounters();
+ for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
+ counters.put(e.getKey(), e.getValue().get());
+ }
+ for (Task task : taskMap.values()) {
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
+ new Hashtable<PartitionId, PartitionProfile>(task.getPartitionSendProfile()));
+ task.dumpProfile(taskProfile);
+ jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile);
+ }
+ }
+
+ @Override
+ public INCApplicationContext getApplicationContext() {
+ return appCtx;
+ }
+
+ @Override
+ public ICounterContext getCounterContext() {
+ return this;
+ }
+
+ @Override
+ public void registerDeallocatable(IDeallocatable deallocatable) {
+ deallocatableRegistry.registerDeallocatable(deallocatable);
+ }
+
+ public void close() {
+ nodeController.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ deallocatableRegistry.close();
+ }
+ });
+ }
+
+ ByteBuffer allocateFrame() {
+ return ByteBuffer.allocate(getFrameSize());
+ }
+
+ int getFrameSize() {
+ return frameSize;
+ }
+
+ IIOManager getIOManager() {
+ return appCtx.getRootContext().getIOManager();
+ }
+
+ @Override
+ public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return fileFactory.createManagedWorkspaceFile(prefix);
+ }
+
+ @Override
+ public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return fileFactory.createUnmanagedWorkspaceFile(prefix);
+ }
+
+ @Override
+ public synchronized ICounter getCounter(String name, boolean create) {
+ Counter counter = counterMap.get(name);
+ if (counter == null && create) {
+ counter = new Counter(name);
+ counterMap.put(name, counter);
+ }
+ return counter;
+ }
+
+ @Override
+ public Object getGlobalJobData() {
+ return globalJobData;
+ }
+
+ public synchronized void advertisePartitionRequest(TaskAttemptId taId, Collection<PartitionId> pids,
+ IPartitionCollector collector, PartitionState minState) throws Exception {
+ for (PartitionId pid : pids) {
+ partitionRequestMap.put(pid, collector);
+ PartitionRequest req = new PartitionRequest(pid, nodeController.getId(), taId, minState);
+ nodeController.getClusterController().registerPartitionRequest(req);
+ }
+ }
+
+ public synchronized void reportPartitionAvailability(PartitionChannel channel) throws HyracksException {
+ IPartitionCollector collector = partitionRequestMap.get(channel.getPartitionId());
+ if (collector != null) {
+ collector.addPartitions(Collections.singleton(channel));
+ }
+ }
+
+ public IJobletEventListener getJobletEventListener() {
+ return jobletEventListener;
+ }
+
+ public void cleanup(JobStatus status) {
+ cleanupStatus = status;
+ cleanupPending = true;
+ if (taskMap.isEmpty()) {
+ performCleanup();
+ }
+ }
+
+ private void performCleanup() {
+ nodeController.getJobletMap().remove(jobId);
+ IJobletEventListener listener = getJobletEventListener();
+ if (listener != null) {
+ listener.jobletFinish(cleanupStatus);
+ }
+ close();
+ cleanupPending = false;
+ try {
+ nodeController.getClusterController().notifyJobletCleanup(jobId, nodeController.getId());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
new file mode 100644
index 0000000..dde7abc
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import org.kohsuke.args4j.CmdLineParser;
+
+import edu.uci.ics.dcache.client.DCacheClient;
+import edu.uci.ics.dcache.client.DCacheClientConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+
+public class NCDriver {
+ public static void main(String args[]) throws Exception {
+ NCConfig ncConfig = new NCConfig();
+ CmdLineParser cp = new CmdLineParser(ncConfig);
+ try {
+ cp.parseArgument(args);
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ cp.printUsage(System.err);
+ return;
+ }
+
+ DCacheClientConfig dccConfig = new DCacheClientConfig();
+ dccConfig.servers = ncConfig.dcacheClientServers;
+ dccConfig.serverLocal = ncConfig.dcacheClientServerLocal;
+ dccConfig.path = ncConfig.dcacheClientPath;
+
+ DCacheClient.get().init(dccConfig);
+
+ final NodeControllerService nService = new NodeControllerService(ncConfig);
+ nService.start();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ nService.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ while (true) {
+ Thread.sleep(10000);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
new file mode 100644
index 0000000..0195143
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -0,0 +1,465 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.io.File;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadMXBean;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.WorkQueue;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
+import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
+import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
+import edu.uci.ics.hyracks.control.nc.work.AbortTasksWork;
+import edu.uci.ics.hyracks.control.nc.work.ApplicationMessageWork;
+import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
+import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
+import edu.uci.ics.hyracks.control.nc.work.CreateApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.DestroyApplicationWork;
+import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
+import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+import edu.uci.ics.hyracks.ipc.api.IPCPerformanceCounters;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class NodeControllerService extends AbstractRemoteService {
+ private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+
+ private NCConfig ncConfig;
+
+ private final String id;
+
+ private final IHyracksRootContext ctx;
+
+ private final IPCSystem ipc;
+
+ private final PartitionManager partitionManager;
+
+ private final NetworkManager netManager;
+
+ private final WorkQueue queue;
+
+ private final Timer timer;
+
+ private boolean registrationPending;
+
+ private Exception registrationException;
+
+ private IClusterController ccs;
+
+ private final Map<JobId, Joblet> jobletMap;
+
+ private final ExecutorService executor;
+
+ private NodeParameters nodeParameters;
+
+ private HeartbeatTask heartbeatTask;
+
+ private final ServerContext serverCtx;
+
+ private final Map<String, NCApplicationContext> applications;
+
+ private final MemoryMXBean memoryMXBean;
+
+ private final List<GarbageCollectorMXBean> gcMXBeans;
+
+ private final ThreadMXBean threadMXBean;
+
+ private final RuntimeMXBean runtimeMXBean;
+
+ private final OperatingSystemMXBean osMXBean;
+
+ private final Mutable<FutureValue<Map<String, NodeControllerInfo>>> getNodeControllerInfosAcceptor;
+
+ public NodeControllerService(NCConfig ncConfig) throws Exception {
+ this.ncConfig = ncConfig;
+ id = ncConfig.nodeId;
+ executor = Executors.newCachedThreadPool();
+ NodeControllerIPCI ipci = new NodeControllerIPCI();
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
+ new CCNCFunctions.SerializerDeserializer());
+ this.ctx = new RootHyracksContext(this, new IOManager(getDevices(ncConfig.ioDevices), executor));
+ if (id == null) {
+ throw new Exception("id not set");
+ }
+ partitionManager = new PartitionManager(this);
+ netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
+
+ queue = new WorkQueue();
+ jobletMap = new Hashtable<JobId, Joblet>();
+ timer = new Timer(true);
+ serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
+ NodeControllerService.class.getName()), id));
+ applications = new Hashtable<String, NCApplicationContext>();
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+ gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ threadMXBean = ManagementFactory.getThreadMXBean();
+ runtimeMXBean = ManagementFactory.getRuntimeMXBean();
+ osMXBean = ManagementFactory.getOperatingSystemMXBean();
+ registrationPending = true;
+ getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
+ }
+
+ public IHyracksRootContext getRootContext() {
+ return ctx;
+ }
+
+ private static List<IODeviceHandle> getDevices(String ioDevices) {
+ List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+ StringTokenizer tok = new StringTokenizer(ioDevices, ",");
+ while (tok.hasMoreElements()) {
+ String devPath = tok.nextToken().trim();
+ devices.add(new IODeviceHandle(new File(devPath), "."));
+ }
+ return devices;
+ }
+
+ private synchronized void setNodeRegistrationResult(NodeParameters parameters, Exception exception) {
+ this.nodeParameters = parameters;
+ this.registrationException = exception;
+ this.registrationPending = false;
+ notifyAll();
+ }
+
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+ FutureValue<Map<String, NodeControllerInfo>> fv = new FutureValue<Map<String, NodeControllerInfo>>();
+ synchronized (getNodeControllerInfosAcceptor) {
+ while (getNodeControllerInfosAcceptor.getValue() != null) {
+ getNodeControllerInfosAcceptor.wait();
+ }
+ getNodeControllerInfosAcceptor.setValue(fv);
+ }
+ ccs.getNodeControllerInfos();
+ return fv.get();
+ }
+
+ private void setNodeControllersInfo(Map<String, NodeControllerInfo> ncInfos) {
+ FutureValue<Map<String, NodeControllerInfo>> fv;
+ synchronized (getNodeControllerInfosAcceptor) {
+ fv = getNodeControllerInfosAcceptor.getValue();
+ getNodeControllerInfosAcceptor.setValue(null);
+ getNodeControllerInfosAcceptor.notifyAll();
+ }
+ fv.setValue(ncInfos);
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOGGER.log(Level.INFO, "Starting NodeControllerService");
+ ipc.start();
+ netManager.start();
+ IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
+ this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
+ HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
+ for (int i = 0; i < gcInfos.length; ++i) {
+ gcInfos[i] = new HeartbeatSchema.GarbageCollectorInfo(gcMXBeans.get(i).getName());
+ }
+ HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
+ .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+
+ synchronized (this) {
+ while (registrationPending) {
+ wait();
+ }
+ }
+ if (registrationException != null) {
+ throw registrationException;
+ }
+
+ queue.start();
+
+ heartbeatTask = new HeartbeatTask(ccs);
+
+ // Schedule heartbeat generator.
+ timer.schedule(heartbeatTask, 0, nodeParameters.getHeartbeatPeriod());
+
+ if (nodeParameters.getProfileDumpPeriod() > 0) {
+ // Schedule profile dump generator.
+ timer.schedule(new ProfileDumpTask(ccs), 0, nodeParameters.getProfileDumpPeriod());
+ }
+
+ LOGGER.log(Level.INFO, "Started NodeControllerService");
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+ executor.shutdownNow();
+ partitionManager.close();
+ heartbeatTask.cancel();
+ netManager.stop();
+ queue.stop();
+ LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public ServerContext getServerContext() {
+ return serverCtx;
+ }
+
+ public Map<String, NCApplicationContext> getApplications() {
+ return applications;
+ }
+
+ public Map<JobId, Joblet> getJobletMap() {
+ return jobletMap;
+ }
+
+ public NetworkManager getNetworkManager() {
+ return netManager;
+ }
+
+ public PartitionManager getPartitionManager() {
+ return partitionManager;
+ }
+
+ public IClusterController getClusterController() {
+ return ccs;
+ }
+
+ public NodeParameters getNodeParameters() {
+ return nodeParameters;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public NCConfig getConfiguration() {
+ return ncConfig;
+ }
+
+ public WorkQueue getWorkQueue() {
+ return queue;
+ }
+
+ private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+ String ipaddrStr = ncConfig.dataIPAddress;
+ ipaddrStr = ipaddrStr.trim();
+ Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
+ Matcher m = pattern.matcher(ipaddrStr);
+ if (!m.matches()) {
+ throw new Exception(MessageFormat.format(
+ "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+ }
+ byte[] ipBytes = new byte[4];
+ ipBytes[0] = (byte) Integer.parseInt(m.group(1));
+ ipBytes[1] = (byte) Integer.parseInt(m.group(2));
+ ipBytes[2] = (byte) Integer.parseInt(m.group(3));
+ ipBytes[3] = (byte) Integer.parseInt(m.group(4));
+ return InetAddress.getByAddress(ipBytes);
+ }
+
+ private class HeartbeatTask extends TimerTask {
+ private IClusterController cc;
+
+ private final HeartbeatData hbData;
+
+ public HeartbeatTask(IClusterController cc) {
+ this.cc = cc;
+ hbData = new HeartbeatData();
+ hbData.gcCollectionCounts = new long[gcMXBeans.size()];
+ hbData.gcCollectionTimes = new long[gcMXBeans.size()];
+ }
+
+ @Override
+ public void run() {
+ MemoryUsage heapUsage = memoryMXBean.getHeapMemoryUsage();
+ hbData.heapInitSize = heapUsage.getInit();
+ hbData.heapUsedSize = heapUsage.getUsed();
+ hbData.heapCommittedSize = heapUsage.getCommitted();
+ hbData.heapMaxSize = heapUsage.getMax();
+ MemoryUsage nonheapUsage = memoryMXBean.getNonHeapMemoryUsage();
+ hbData.nonheapInitSize = nonheapUsage.getInit();
+ hbData.nonheapUsedSize = nonheapUsage.getUsed();
+ hbData.nonheapCommittedSize = nonheapUsage.getCommitted();
+ hbData.nonheapMaxSize = nonheapUsage.getMax();
+ hbData.threadCount = threadMXBean.getThreadCount();
+ hbData.peakThreadCount = threadMXBean.getPeakThreadCount();
+ hbData.totalStartedThreadCount = threadMXBean.getTotalStartedThreadCount();
+ hbData.systemLoadAverage = osMXBean.getSystemLoadAverage();
+ int gcN = gcMXBeans.size();
+ for (int i = 0; i < gcN; ++i) {
+ GarbageCollectorMXBean gcMXBean = gcMXBeans.get(i);
+ hbData.gcCollectionCounts[i] = gcMXBean.getCollectionCount();
+ hbData.gcCollectionTimes[i] = gcMXBean.getCollectionTime();
+ }
+
+ MuxDemuxPerformanceCounters netPC = netManager.getPerformanceCounters();
+ hbData.netPayloadBytesRead = netPC.getPayloadBytesRead();
+ hbData.netPayloadBytesWritten = netPC.getPayloadBytesWritten();
+ hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
+ hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
+
+ IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
+ hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
+ hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
+ hbData.ipcMessagesReceived = ipcPC.getMessageReceivedCount();
+ hbData.ipcMessageBytesReceived = ipcPC.getMessageBytesReceived();
+
+ try {
+ cc.nodeHeartbeat(id, hbData);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private class ProfileDumpTask extends TimerTask {
+ private IClusterController cc;
+
+ public ProfileDumpTask(IClusterController cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public void run() {
+ try {
+ FutureValue<List<JobProfile>> fv = new FutureValue<List<JobProfile>>();
+ BuildJobProfilesWork bjpw = new BuildJobProfilesWork(NodeControllerService.this, fv);
+ queue.scheduleAndSync(bjpw);
+ List<JobProfile> profiles = fv.get();
+ if (!profiles.isEmpty()) {
+ cc.reportProfile(id, profiles);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private final class NodeControllerIPCI implements IIPCI {
+ @Override
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
+ switch (fn.getFunctionId()) {
+ case SEND_APPLICATION_MESSAGE: {
+ CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+ queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
+ .getAppName(), amf.getNodeId()));
+ return;
+ }
+ case START_TASKS: {
+ CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
+ queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getAppName(), stf.getJobId(), stf
+ .getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
+ return;
+ }
+
+ case ABORT_TASKS: {
+ CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn;
+ queue.schedule(new AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+ return;
+ }
+
+ case CLEANUP_JOBLET: {
+ CCNCFunctions.CleanupJobletFunction cjf = (CCNCFunctions.CleanupJobletFunction) fn;
+ queue.schedule(new CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+ return;
+ }
+
+ case CREATE_APPLICATION: {
+ CCNCFunctions.CreateApplicationFunction caf = (CCNCFunctions.CreateApplicationFunction) fn;
+ queue.schedule(new CreateApplicationWork(NodeControllerService.this, caf.getAppName(), caf
+ .isDeployHar(), caf.getSerializedDistributedState()));
+ return;
+ }
+
+ case DESTROY_APPLICATION: {
+ CCNCFunctions.DestroyApplicationFunction daf = (CCNCFunctions.DestroyApplicationFunction) fn;
+ queue.schedule(new DestroyApplicationWork(NodeControllerService.this, daf.getAppName()));
+ return;
+ }
+
+ case REPORT_PARTITION_AVAILABILITY: {
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+ queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
+ .getPartitionId(), rpaf.getNetworkAddress()));
+ return;
+ }
+
+ case NODE_REGISTRATION_RESULT: {
+ CCNCFunctions.NodeRegistrationResult nrrf = (CCNCFunctions.NodeRegistrationResult) fn;
+ setNodeRegistrationResult(nrrf.getNodeParameters(), nrrf.getException());
+ return;
+ }
+
+ case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
+ CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+ setNodeControllersInfo(gncirf.getNodeControllerInfos());
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+
+ }
+ }
+
+ public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
+ ccs.sendApplicationMessageToCC(data, appName, nodeId);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
new file mode 100644
index 0000000..eba3ec9
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.Counter;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskCompleteWork;
+import edu.uci.ics.hyracks.control.nc.work.NotifyTaskFailureWork;
+
+public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
+ private final Joblet joblet;
+
+ private final TaskAttemptId taskAttemptId;
+
+ private final String displayName;
+
+ private final Executor executor;
+
+ private final IWorkspaceFileFactory fileFactory;
+
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final Map<String, Counter> counterMap;
+
+ private final IOperatorEnvironment opEnv;
+
+ private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+ private final Set<Thread> pendingThreads;
+
+ private IPartitionCollector[] collectors;
+
+ private IOperatorNodePushable operator;
+
+ private volatile boolean failed;
+
+ private ByteArrayOutputStream errorBaos;
+
+ private PrintWriter errorWriter;
+
+ private volatile boolean aborted;
+
+ private NodeControllerService ncs;
+
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs) {
+ this.joblet = joblet;
+ this.taskAttemptId = taskId;
+ this.displayName = displayName;
+ this.executor = executor;
+ fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ counterMap = new HashMap<String, Counter>();
+ opEnv = joblet.getEnvironment();
+ partitionSendProfile = new Hashtable<PartitionId, PartitionProfile>();
+ pendingThreads = new LinkedHashSet<Thread>();
+ failed = false;
+ errorBaos = new ByteArrayOutputStream();
+ errorWriter = new PrintWriter(errorBaos, true);
+ this.ncs = ncs;
+ }
+
+ public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
+ this.collectors = collectors;
+ this.operator = operator;
+ }
+
+ @Override
+ public ByteBuffer allocateFrame() {
+ return joblet.allocateFrame();
+ }
+
+ @Override
+ public int getFrameSize() {
+ return joblet.getFrameSize();
+ }
+
+ @Override
+ public IIOManager getIOManager() {
+ return joblet.getIOManager();
+ }
+
+ @Override
+ public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return fileFactory.createUnmanagedWorkspaceFile(prefix);
+ }
+
+ @Override
+ public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return fileFactory.createManagedWorkspaceFile(prefix);
+ }
+
+ @Override
+ public void registerDeallocatable(IDeallocatable deallocatable) {
+ deallocatableRegistry.registerDeallocatable(deallocatable);
+ }
+
+ public void close() {
+ deallocatableRegistry.close();
+ }
+
+ @Override
+ public IHyracksJobletContext getJobletContext() {
+ return joblet;
+ }
+
+ @Override
+ public TaskAttemptId getTaskAttemptId() {
+ return taskAttemptId;
+ }
+
+ @Override
+ public ICounter getCounter(String name, boolean create) {
+ Counter counter = counterMap.get(name);
+ if (counter == null && create) {
+ counter = new Counter(name);
+ counterMap.put(name, counter);
+ }
+ return counter;
+ }
+
+ @Override
+ public ICounterContext getCounterContext() {
+ return this;
+ }
+
+ public Joblet getJoblet() {
+ return joblet;
+ }
+
+ public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
+ return partitionSendProfile;
+ }
+
+ public synchronized void dumpProfile(TaskProfile tProfile) {
+ Map<String, Long> dumpMap = tProfile.getCounters();
+ for (Counter c : counterMap.values()) {
+ dumpMap.put(c.getName(), c.get());
+ }
+ }
+
+ public void setPartitionSendProfile(PartitionProfile profile) {
+ partitionSendProfile.put(profile.getPartitionId(), profile);
+ }
+
+ public void start() throws HyracksException {
+ aborted = false;
+ executor.execute(this);
+ }
+
+ public synchronized void abort() {
+ aborted = true;
+ for (IPartitionCollector c : collectors) {
+ c.abort();
+ }
+ for (Thread t : pendingThreads) {
+ t.interrupt();
+ }
+ }
+
+ private synchronized void addPendingThread(Thread t) {
+ pendingThreads.add(t);
+ }
+
+ private synchronized void removePendingThread(Thread t) {
+ pendingThreads.remove(t);
+ if (pendingThreads.isEmpty()) {
+ notifyAll();
+ }
+ }
+
+ public synchronized void waitForCompletion() throws InterruptedException {
+ while (!pendingThreads.isEmpty()) {
+ wait();
+ }
+ }
+
+ @Override
+ public void run() {
+ Thread ct = Thread.currentThread();
+ String threadName = ct.getName();
+ addPendingThread(ct);
+ try {
+ ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
+ operator.initialize();
+ try {
+ if (collectors.length > 0) {
+ final Semaphore sem = new Semaphore(collectors.length - 1);
+ for (int i = 1; i < collectors.length; ++i) {
+ final IPartitionCollector collector = collectors[i];
+ final IFrameWriter writer = operator.getInputFrameWriter(i);
+ sem.acquire();
+ final int cIdx = i;
+ executor.execute(new Runnable() {
+ public void run() {
+ if (aborted) {
+ return;
+ }
+ Thread thread = Thread.currentThread();
+ addPendingThread(thread);
+ String oldName = thread.getName();
+ thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+ try {
+ pushFrames(collector, writer);
+ } catch (HyracksDataException e) {
+ synchronized (Task.this) {
+ failed = true;
+ errorWriter.println("Exception caught by thread: " + thread.getName());
+ e.printStackTrace(errorWriter);
+ errorWriter.println();
+ }
+ } finally {
+ thread.setName(oldName);
+ sem.release();
+ removePendingThread(thread);
+ }
+ }
+ });
+ }
+ try {
+ pushFrames(collectors[0], operator.getInputFrameWriter(0));
+ } finally {
+ sem.acquire(collectors.length - 1);
+ }
+ }
+ } finally {
+ operator.deinitialize();
+ }
+ NodeControllerService ncs = joblet.getNodeController();
+ ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
+ } catch (Exception e) {
+ failed = true;
+ errorWriter.println("Exception caught by thread: " + ct.getName());
+ e.printStackTrace(errorWriter);
+ errorWriter.println();
+ } finally {
+ ct.setName(threadName);
+ close();
+ removePendingThread(ct);
+ }
+ if (failed) {
+ errorWriter.close();
+ NodeControllerService ncs = joblet.getNodeController();
+ try {
+ ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, errorBaos.toString("UTF-8")));
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException {
+ if (aborted) {
+ return;
+ }
+ try {
+ collector.open();
+ try {
+ joblet.advertisePartitionRequest(taskAttemptId, collector.getRequiredPartitionIds(), collector,
+ PartitionState.STARTED);
+ IFrameReader reader = collector.getReader();
+ reader.open();
+ try {
+ writer.open();
+ try {
+ ByteBuffer buffer = allocateFrame();
+ while (reader.nextFrame(buffer)) {
+ if (aborted) {
+ return;
+ }
+ buffer.flip();
+ writer.nextFrame(buffer);
+ buffer.compact();
+ }
+ } catch (Exception e) {
+ writer.fail();
+ throw e;
+ } finally {
+ writer.close();
+ }
+ } finally {
+ reader.close();
+ }
+ } finally {
+ collector.close();
+ }
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void setStateObject(IStateObject taskState) {
+ opEnv.setStateObject(taskState);
+ }
+
+ @Override
+ public IStateObject getStateObject(Object id) {
+ return opEnv.getStateObject(id);
+ }
+
+ @Override
+ public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
+ this.ncs.sendApplicationMessageToCC(message, this.getJobletContext().getApplicationContext()
+ .getApplicationName(), nodeId);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
new file mode 100644
index 0000000..1121c6c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -0,0 +1,60 @@
+package edu.uci.ics.hyracks.control.nc.application;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCBootstrap;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+
+public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+ private final String nodeId;
+ private final IHyracksRootContext rootCtx;
+ private Object appObject;
+
+ public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String appName, String nodeId)
+ throws IOException {
+ super(serverCtx, appName);
+ this.nodeId = nodeId;
+ this.rootCtx = rootCtx;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setDistributedState(Serializable state) {
+ distributedState = state;
+ }
+
+ @Override
+ protected void start() throws Exception {
+ ((INCBootstrap) bootstrap).setApplicationContext(this);
+ bootstrap.start();
+ }
+
+ @Override
+ protected void stop() throws Exception {
+ if (bootstrap != null) {
+ bootstrap.stop();
+ }
+ }
+
+ @Override
+ public IHyracksRootContext getRootContext() {
+ return rootCtx;
+ }
+
+ @Override
+ public void setApplicationObject(Object object) {
+ this.appObject = object;
+ }
+
+ @Override
+ public Object getApplicationObject() {
+ return appObject;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/FileHandle.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/FileHandle.java
new file mode 100644
index 0000000..8916fb1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/FileHandle.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public class FileHandle implements IFileHandle {
+ private final FileReference fileRef;
+
+ private RandomAccessFile raf;
+
+ private FileChannel channel;
+
+ public FileHandle(FileReference fileRef) {
+ this.fileRef = fileRef;
+ }
+
+ public void open(IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode) throws IOException {
+ String mode;
+ switch (rwMode) {
+ case READ_ONLY:
+ mode = "r";
+ break;
+
+ case READ_WRITE:
+ fileRef.getFile().getAbsoluteFile().getParentFile().mkdirs();
+ switch (syncMode) {
+ case METADATA_ASYNC_DATA_ASYNC:
+ mode = "rw";
+ break;
+
+ case METADATA_ASYNC_DATA_SYNC:
+ mode = "rwd";
+ break;
+
+ case METADATA_SYNC_DATA_SYNC:
+ mode = "rws";
+ break;
+
+ default:
+ throw new IllegalArgumentException();
+ }
+ break;
+
+ default:
+ throw new IllegalArgumentException();
+ }
+ raf = new RandomAccessFile(fileRef.getFile(), mode);
+ channel = raf.getChannel();
+ }
+
+ public void close() throws IOException {
+ channel.close();
+ raf.close();
+ }
+
+ public FileReference getFileReference() {
+ return fileRef;
+ }
+
+ public RandomAccessFile getRandomAccessFile() {
+ return raf;
+ }
+
+ public FileChannel getFileChannel() {
+ return channel;
+ }
+
+ public void sync(boolean metadata) throws IOException {
+ channel.force(metadata);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
new file mode 100644
index 0000000..3b13f32
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/IOManager.java
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOFuture;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+
+public class IOManager implements IIOManager {
+ private final List<IODeviceHandle> ioDevices;
+
+ private final Executor executor;
+
+ private final List<IODeviceHandle> workAreaIODevices;
+
+ private int workAreaDeviceIndex;
+
+ public IOManager(List<IODeviceHandle> devices, Executor executor) throws HyracksException {
+ this.ioDevices = Collections.unmodifiableList(devices);
+ this.executor = executor;
+ workAreaIODevices = new ArrayList<IODeviceHandle>();
+ for (IODeviceHandle d : ioDevices) {
+ if (d.getWorkAreaPath() != null) {
+ new File(d.getPath(), d.getWorkAreaPath()).mkdirs();
+ workAreaIODevices.add(d);
+ }
+ }
+ if (workAreaIODevices.isEmpty()) {
+ throw new HyracksException("No devices with work areas found");
+ }
+ workAreaDeviceIndex = 0;
+ }
+
+ @Override
+ public List<IODeviceHandle> getIODevices() {
+ return ioDevices;
+ }
+
+ @Override
+ public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+ throws HyracksDataException {
+ FileHandle fHandle = new FileHandle(fileRef);
+ try {
+ fHandle.open(rwMode, syncMode);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return fHandle;
+ }
+
+ @Override
+ public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ try {
+ int n = 0;
+ int remaining = data.remaining();
+ while (remaining > 0) {
+ int len = ((FileHandle) fHandle).getFileChannel().write(data, offset);
+ if (len < 0) {
+ throw new HyracksDataException("Error writing to file: "
+ + ((FileHandle) fHandle).getFileReference().toString());
+ }
+ remaining -= len;
+ offset += len;
+ n += len;
+ }
+ return n;
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException {
+ try {
+ int n = 0;
+ int remaining = data.remaining();
+ while (remaining > 0) {
+ int len = ((FileHandle) fHandle).getFileChannel().read(data, offset);
+ if (len < 0) {
+ return -1;
+ }
+ remaining -= len;
+ offset += len;
+ n += len;
+ }
+ return n;
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) {
+ AsyncWriteRequest req = new AsyncWriteRequest((FileHandle) fHandle, offset, data);
+ executor.execute(req);
+ return req;
+ }
+
+ @Override
+ public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) {
+ AsyncReadRequest req = new AsyncReadRequest((FileHandle) fHandle, offset, data);
+ executor.execute(req);
+ return req;
+ }
+
+ @Override
+ public void close(IFileHandle fHandle) throws HyracksDataException {
+ try {
+ ((FileHandle) fHandle).close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public synchronized FileReference createWorkspaceFile(String prefix) throws HyracksDataException {
+ IODeviceHandle dev = workAreaIODevices.get(workAreaDeviceIndex);
+ workAreaDeviceIndex = (workAreaDeviceIndex + 1) % workAreaIODevices.size();
+ String waPath = dev.getWorkAreaPath();
+ File waf;
+ try {
+ waf = File.createTempFile(prefix, ".waf", new File(dev.getPath(), waPath));
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ return dev.createFileReference(waPath + File.separator + waf.getName());
+ }
+
+ private abstract class AsyncRequest implements IIOFuture, Runnable {
+ protected final FileHandle fHandle;
+ protected final long offset;
+ protected final ByteBuffer data;
+ private boolean complete;
+ private HyracksDataException exception;
+ private int result;
+
+ private AsyncRequest(FileHandle fHandle, long offset, ByteBuffer data) {
+ this.fHandle = fHandle;
+ this.offset = offset;
+ this.data = data;
+ complete = false;
+ exception = null;
+ }
+
+ @Override
+ public void run() {
+ HyracksDataException hde = null;
+ int res = -1;
+ try {
+ res = performOperation();
+ } catch (HyracksDataException e) {
+ hde = e;
+ }
+ synchronized (this) {
+ exception = hde;
+ result = res;
+ complete = true;
+ notifyAll();
+ }
+ }
+
+ protected abstract int performOperation() throws HyracksDataException;
+
+ @Override
+ public synchronized int synchronize() throws HyracksDataException, InterruptedException {
+ while (!complete) {
+ wait();
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ return result;
+ }
+
+ @Override
+ public synchronized boolean isComplete() {
+ return complete;
+ }
+ }
+
+ private class AsyncReadRequest extends AsyncRequest {
+ private AsyncReadRequest(FileHandle fHandle, long offset, ByteBuffer data) {
+ super(fHandle, offset, data);
+ }
+
+ @Override
+ protected int performOperation() throws HyracksDataException {
+ return syncRead(fHandle, offset, data);
+ }
+ }
+
+ private class AsyncWriteRequest extends AsyncRequest {
+ private AsyncWriteRequest(FileHandle fHandle, long offset, ByteBuffer data) {
+ super(fHandle, offset, data);
+ }
+
+ @Override
+ protected int performOperation() throws HyracksDataException {
+ return syncWrite(fHandle, offset, data);
+ }
+ }
+
+ @Override
+ public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
+ try {
+ ((FileHandle) fileHandle).sync(metadata);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/WorkspaceFileFactory.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/WorkspaceFileFactory.java
new file mode 100644
index 0000000..16421ab
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/io/WorkspaceFileFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.io;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public final class WorkspaceFileFactory implements IWorkspaceFileFactory {
+ private final IDeallocatableRegistry registry;
+ private final IOManager ioManager;
+
+ public WorkspaceFileFactory(IDeallocatableRegistry registry, IOManager ioManager) {
+ this.registry = registry;
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException {
+ final FileReference fRef = ioManager.createWorkspaceFile(prefix);
+ registry.registerDeallocatable(new IDeallocatable() {
+ @Override
+ public void deallocate() {
+ fRef.delete();
+ }
+ });
+ return fRef;
+ }
+
+ @Override
+ public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
+ return ioManager.createWorkspaceFile(prefix);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
new file mode 100644
index 0000000..1d5af84
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkInputChannel implements IInputChannel {
+ private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+
+ private final NetworkManager netManager;
+
+ private final SocketAddress remoteAddress;
+
+ private final PartitionId partitionId;
+
+ private final Queue<ByteBuffer> fullQueue;
+
+ private final int nBuffers;
+
+ private ChannelControlBlock ccb;
+
+ private IInputChannelMonitor monitor;
+
+ private Object attachment;
+
+ public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
+ int nBuffers) {
+ this.netManager = netManager;
+ this.remoteAddress = remoteAddress;
+ this.partitionId = partitionId;
+ fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ this.nBuffers = nBuffers;
+ }
+
+ @Override
+ public void registerMonitor(IInputChannelMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public synchronized ByteBuffer getNextBuffer() {
+ return fullQueue.poll();
+ }
+
+ @Override
+ public void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+ }
+
+ @Override
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ try {
+ ccb = netManager.connect(remoteAddress);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ for (int i = 0; i < nBuffers; ++i) {
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+ }
+ ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+ writeBuffer.putLong(partitionId.getJobId().getId());
+ writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+ writeBuffer.putInt(partitionId.getSenderIndex());
+ writeBuffer.putInt(partitionId.getReceiverIndex());
+ writeBuffer.flip();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Sending partition request: " + partitionId + " on channel: " + ccb);
+ }
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ fullQueue.add(buffer);
+ monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
+ }
+
+ @Override
+ public void close() {
+ monitor.notifyEndOfStream(NetworkInputChannel.this);
+ }
+
+ @Override
+ public void error(int ecode) {
+ monitor.notifyFailure(NetworkInputChannel.this);
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ // do nothing
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
new file mode 100644
index 0000000..b805595
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.exceptions.NetException;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class NetworkManager {
+ private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
+
+ private static final int MAX_CONNECTION_ATTEMPTS = 5;
+
+ static final int INITIAL_MESSAGE_SIZE = 20;
+
+ private final PartitionManager partitionManager;
+
+ private final MuxDemux md;
+
+ private NetworkAddress networkAddress;
+
+ public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads) throws IOException {
+ this.partitionManager = partitionManager;
+ md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
+ MAX_CONNECTION_ATTEMPTS);
+ }
+
+ public void start() throws IOException {
+ md.start();
+ InetSocketAddress sockAddr = md.getLocalAddress();
+ networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public void stop() {
+
+ }
+
+ public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+ MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+ return mConn.openChannel();
+ }
+
+ private class ChannelOpenListener implements IChannelOpenListener {
+ @Override
+ public void channelOpened(ChannelControlBlock channel) {
+ channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+ channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
+ }
+ }
+
+ private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+ private final ChannelControlBlock ccb;
+
+ private NetworkOutputChannel noc;
+
+ public InitialBufferAcceptor(ChannelControlBlock ccb) {
+ this.ccb = ccb;
+ }
+
+ @Override
+ public void accept(ByteBuffer buffer) {
+ PartitionId pid = readInitialMessage(buffer);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Received initial partition request: " + pid + " on channel: " + ccb);
+ }
+ noc = new NetworkOutputChannel(ccb, 1);
+ try {
+ partitionManager.registerPartitionRequest(pid, noc);
+ } catch (HyracksException e) {
+ noc.abort();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void error(int ecode) {
+ if (noc != null) {
+ noc.abort();
+ }
+ }
+ }
+
+ private static PartitionId readInitialMessage(ByteBuffer buffer) {
+ JobId jobId = new JobId(buffer.getLong());
+ ConnectorDescriptorId cdid = new ConnectorDescriptorId(buffer.getInt());
+ int senderIndex = buffer.getInt();
+ int receiverIndex = buffer.getInt();
+ return new PartitionId(jobId, cdid, senderIndex, receiverIndex);
+ }
+
+ public MuxDemuxPerformanceCounters getPerformanceCounters() {
+ return md.getPerformanceCounters();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
new file mode 100644
index 0000000..9024e18
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkOutputChannel implements IFrameWriter {
+ private final ChannelControlBlock ccb;
+
+ private final int nBuffers;
+
+ private final Deque<ByteBuffer> emptyStack;
+
+ private boolean aborted;
+
+ public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
+ this.ccb = ccb;
+ this.nBuffers = nBuffers;
+ emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ }
+
+ public void setTaskContext(IHyracksTaskContext ctx) {
+ for (int i = 0; i < nBuffers; ++i) {
+ emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer destBuffer = null;
+ synchronized (this) {
+ while (true) {
+ if (aborted) {
+ throw new HyracksDataException("Connection has been aborted");
+ }
+ destBuffer = emptyStack.poll();
+ if (destBuffer != null) {
+ break;
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ buffer.position(0);
+ buffer.limit(destBuffer.capacity());
+ destBuffer.clear();
+ destBuffer.put(buffer);
+ destBuffer.flip();
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ void abort() {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ synchronized (NetworkOutputChannel.this) {
+ aborted = true;
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ synchronized (NetworkOutputChannel.this) {
+ emptyStack.push(buffer);
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
new file mode 100644
index 0000000..ae4fd2b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializedPartition implements IPartition {
+ private final IHyracksTaskContext ctx;
+
+ private final FileReference partitionFile;
+
+ private final Executor executor;
+
+ private final IOManager ioManager;
+
+ public MaterializedPartition(IHyracksTaskContext ctx, FileReference partitionFile, Executor executor,
+ IOManager ioManager) {
+ this.ctx = ctx;
+ this.partitionFile = partitionFile;
+ this.executor = executor;
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public IHyracksTaskContext getTaskContext() {
+ return ctx;
+ }
+
+ @Override
+ public void deallocate() {
+ partitionFile.delete();
+ }
+
+ @Override
+ public void writeTo(final IFrameWriter writer) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ IFileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try {
+ writer.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ while (true) {
+ buffer.clear();
+ long size = ioManager.syncRead(fh, offset, buffer);
+ if (size < 0) {
+ break;
+ } else if (size < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += size;
+ buffer.flip();
+ writer.nextFrame(buffer);
+ }
+ } finally {
+ writer.close();
+ }
+ } finally {
+ ioManager.close(fh);
+ }
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public boolean isReusable() {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
new file mode 100644
index 0000000..16e31f7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class MaterializedPartitionInputChannel implements IInputChannel {
+ private final int nBuffers;
+
+ private final Queue<ByteBuffer> emptyQueue;
+
+ private final Queue<ByteBuffer> fullQueue;
+
+ private final PartitionId pid;
+
+ private final PartitionManager manager;
+
+ private final FrameWriter writer;
+
+ private IInputChannelMonitor monitor;
+
+ private Object attachment;
+
+ public MaterializedPartitionInputChannel(int nBuffers, PartitionId pid, PartitionManager manager) {
+ this.nBuffers = nBuffers;
+ this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ this.pid = pid;
+ this.manager = manager;
+ writer = new FrameWriter();
+ }
+
+ @Override
+ public void registerMonitor(IInputChannelMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public ByteBuffer getNextBuffer() {
+ return fullQueue.poll();
+ }
+
+ @Override
+ public void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ synchronized (this) {
+ emptyQueue.add(buffer);
+ notifyAll();
+ }
+ }
+
+ @Override
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ for (int i = 0; i < nBuffers; ++i) {
+ emptyQueue.add(ctx.allocateFrame());
+ }
+ IPartition partition = manager.getPartition(pid);
+ partition.writeTo(writer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ private class FrameWriter implements IFrameWriter {
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ synchronized (MaterializedPartitionInputChannel.this) {
+ while (emptyQueue.isEmpty()) {
+ try {
+ MaterializedPartitionInputChannel.this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ByteBuffer destFrame = emptyQueue.poll();
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ destFrame.clear();
+ destFrame.put(buffer);
+ fullQueue.add(destFrame);
+ monitor.notifyDataAvailability(MaterializedPartitionInputChannel.this, 1);
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ monitor.notifyEndOfStream(MaterializedPartitionInputChannel.this);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
new file mode 100644
index 0000000..7bd0eb1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializedPartitionWriter implements IFrameWriter {
+ private static final Logger LOGGER = Logger.getLogger(MaterializedPartitionWriter.class.getName());
+
+ private final IHyracksTaskContext ctx;
+
+ private final PartitionManager manager;
+
+ private final PartitionId pid;
+
+ private final TaskAttemptId taId;
+
+ private final Executor executor;
+
+ private FileReference fRef;
+
+ private IFileHandle handle;
+
+ private long size;
+
+ private boolean failed;
+
+ public MaterializedPartitionWriter(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
+ TaskAttemptId taId, Executor executor) {
+ this.ctx = ctx;
+ this.manager = manager;
+ this.pid = pid;
+ this.taId = taId;
+ this.executor = executor;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("open(" + pid + " by " + taId);
+ }
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ size = 0;
+ failed = false;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ size += ctx.getIOManager().syncWrite(handle, size, buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ failed = true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("close(" + pid + " by " + taId);
+ }
+ ctx.getIOManager().close(handle);
+ if (!failed) {
+ manager.registerPartition(pid, taId,
+ new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()),
+ PartitionState.COMMITTED);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
new file mode 100644
index 0000000..62320c5
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializingPipelinedPartition implements IFrameWriter, IPartition {
+ private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
+
+ private final IHyracksTaskContext ctx;
+
+ private final Executor executor;
+
+ private final IOManager ioManager;
+
+ private final PartitionManager manager;
+
+ private final PartitionId pid;
+
+ private final TaskAttemptId taId;
+
+ private FileReference fRef;
+
+ private IFileHandle handle;
+
+ private long size;
+
+ private boolean eos;
+
+ private boolean failed;
+
+ public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
+ TaskAttemptId taId, Executor executor) {
+ this.ctx = ctx;
+ this.executor = executor;
+ this.ioManager = (IOManager) ctx.getIOManager();
+ this.manager = manager;
+ this.pid = pid;
+ this.taId = taId;
+ }
+
+ @Override
+ public IHyracksTaskContext getTaskContext() {
+ return ctx;
+ }
+
+ @Override
+ public void deallocate() {
+ fRef.delete();
+ }
+
+ @Override
+ public void writeTo(final IFrameWriter writer) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ IFileHandle fh = ioManager.open(fRef, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try {
+ writer.open();
+ try {
+ long offset = 0;
+ ByteBuffer buffer = ctx.allocateFrame();
+ boolean fail = false;
+ boolean done = false;
+ while (!fail && !done) {
+ synchronized (MaterializingPipelinedPartition.this) {
+ while (offset >= size && !eos && !failed) {
+ try {
+ MaterializingPipelinedPartition.this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ fail = failed;
+ done = eos && offset >= size;
+ }
+ if (fail) {
+ writer.fail();
+ } else if (!done) {
+ buffer.clear();
+ long readLen = ioManager.syncRead(fh, offset, buffer);
+ if (readLen < buffer.capacity()) {
+ throw new HyracksDataException("Premature end of file");
+ }
+ offset += readLen;
+ buffer.flip();
+ writer.nextFrame(buffer);
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ } finally {
+ ioManager.close(fh);
+ }
+ } catch (HyracksDataException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public boolean isReusable() {
+ return true;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("open(" + pid + " by " + taId);
+ }
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+ handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ size = 0;
+ eos = false;
+ failed = false;
+ manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+ }
+
+ @Override
+ public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ size += ctx.getIOManager().syncWrite(handle, size, buffer);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void fail() throws HyracksDataException {
+ failed = true;
+ notifyAll();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("close(" + pid + " by " + taId);
+ }
+ boolean commit = false;
+ synchronized (this) {
+ eos = true;
+ ctx.getIOManager().close(handle);
+ handle = null;
+ commit = !failed;
+ notifyAll();
+ }
+ if (commit) {
+ manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
new file mode 100644
index 0000000..45c091a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
+
+public class PartitionManager {
+ private final NodeControllerService ncs;
+
+ private final Map<PartitionId, List<IPartition>> partitionMap;
+
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final IWorkspaceFileFactory fileFactory;
+
+ public PartitionManager(NodeControllerService ncs) {
+ this.ncs = ncs;
+ partitionMap = new HashMap<PartitionId, List<IPartition>>();
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
+ }
+
+ public void registerPartition(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+ throws HyracksDataException {
+ synchronized (this) {
+ List<IPartition> pList = partitionMap.get(pid);
+ if (pList == null) {
+ pList = new ArrayList<IPartition>();
+ partitionMap.put(pid, pList);
+ }
+ pList.add(partition);
+ }
+ updatePartitionState(pid, taId, partition, state);
+ }
+
+ public void updatePartitionState(PartitionId pid, TaskAttemptId taId, IPartition partition, PartitionState state)
+ throws HyracksDataException {
+ PartitionDescriptor desc = new PartitionDescriptor(pid, ncs.getId(), taId, partition.isReusable());
+ desc.setState(state);
+ try {
+ ncs.getClusterController().registerPartitionProvider(desc);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ public synchronized IPartition getPartition(PartitionId pid) {
+ return partitionMap.get(pid).get(0);
+ }
+
+ public synchronized void unregisterPartitions(JobId jobId, Collection<IPartition> unregisteredPartitions) {
+ for (Iterator<Map.Entry<PartitionId, List<IPartition>>> i = partitionMap.entrySet().iterator(); i.hasNext();) {
+ Map.Entry<PartitionId, List<IPartition>> e = i.next();
+ PartitionId pid = e.getKey();
+ if (jobId.equals(pid.getJobId())) {
+ for (IPartition p : e.getValue()) {
+ unregisteredPartitions.add(p);
+ }
+ i.remove();
+ }
+ }
+ }
+
+ public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
+ throws HyracksException {
+ List<IPartition> pList = partitionMap.get(partitionId);
+ if (pList != null && !pList.isEmpty()) {
+ IPartition partition = pList.get(0);
+ writer.setTaskContext(partition.getTaskContext());
+ partition.writeTo(writer);
+ if (!partition.isReusable()) {
+ partitionMap.remove(partitionId);
+ }
+ } else {
+ throw new HyracksException("Request for unknown partition " + partitionId);
+ }
+ }
+
+ public IWorkspaceFileFactory getFileFactory() {
+ return fileFactory;
+ }
+
+ public void close() {
+ deallocatableRegistry.close();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
new file mode 100644
index 0000000..e427cf3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+
+public class PipelinedPartition implements IFrameWriter, IPartition {
+ private final IHyracksTaskContext ctx;
+
+ private final PartitionManager manager;
+
+ private final PartitionId pid;
+
+ private final TaskAttemptId taId;
+
+ private IFrameWriter delegate;
+
+ private boolean pendingConnection;
+
+ private boolean failed;
+
+ public PipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
+ this.ctx = ctx;
+ this.manager = manager;
+ this.pid = pid;
+ this.taId = taId;
+ }
+
+ @Override
+ public IHyracksTaskContext getTaskContext() {
+ return ctx;
+ }
+
+ @Override
+ public boolean isReusable() {
+ return false;
+ }
+
+ @Override
+ public void deallocate() {
+ // do nothing
+ }
+
+ @Override
+ public synchronized void writeTo(IFrameWriter writer) {
+ delegate = writer;
+ notifyAll();
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ manager.registerPartition(pid, taId, this, PartitionState.STARTED);
+ failed = false;
+ pendingConnection = true;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ensureConnected();
+ delegate.nextFrame(buffer);
+ }
+
+ private void ensureConnected() throws HyracksDataException {
+ if (pendingConnection) {
+ synchronized (this) {
+ while (delegate == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ delegate.open();
+ }
+ pendingConnection = false;
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ failed = true;
+ if (delegate != null) {
+ delegate.fail();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!failed) {
+ ensureConnected();
+ manager.updatePartitionState(pid, taId, this, PartitionState.COMMITTED);
+ delegate.close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
new file mode 100644
index 0000000..b1e58fc
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class ReceiveSideMaterializingCollector implements IPartitionCollector {
+ private final IHyracksTaskContext ctx;
+
+ private PartitionManager manager;
+
+ private final IPartitionCollector delegate;
+
+ private final TaskAttemptId taId;
+
+ private final Executor executor;
+
+ public ReceiveSideMaterializingCollector(IHyracksTaskContext ctx, PartitionManager manager,
+ IPartitionCollector collector, TaskAttemptId taId, Executor executor) {
+ this.ctx = ctx;
+ this.manager = manager;
+ this.delegate = collector;
+ this.taId = taId;
+ this.executor = executor;
+ }
+
+ @Override
+ public JobId getJobId() {
+ return delegate.getJobId();
+ }
+
+ @Override
+ public ConnectorDescriptorId getConnectorId() {
+ return delegate.getConnectorId();
+ }
+
+ @Override
+ public int getReceiverIndex() {
+ return delegate.getReceiverIndex();
+ }
+
+ @Override
+ public void open() throws HyracksException {
+ delegate.open();
+ }
+
+ @Override
+ public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+ for (final PartitionChannel pc : partitions) {
+ PartitionWriter writer = new PartitionWriter(pc);
+ executor.execute(writer);
+ }
+ }
+
+ private class PartitionWriter implements Runnable, IInputChannelMonitor {
+ private PartitionChannel pc;
+
+ private final AtomicInteger nAvailableFrames;
+
+ private final AtomicBoolean eos;
+
+ private final AtomicBoolean failed;
+
+ public PartitionWriter(PartitionChannel pc) {
+ this.pc = pc;
+ nAvailableFrames = new AtomicInteger(0);
+ eos = new AtomicBoolean(false);
+ failed = new AtomicBoolean(false);
+ }
+
+ @Override
+ public synchronized void notifyFailure(IInputChannel channel) {
+ failed.set(true);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+ nAvailableFrames.addAndGet(nFrames);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyEndOfStream(IInputChannel channel) {
+ eos.set(true);
+ notifyAll();
+ }
+
+ @Override
+ public void run() {
+ PartitionId pid = pc.getPartitionId();
+ MaterializedPartitionWriter mpw = new MaterializedPartitionWriter(ctx, manager, pid, taId, executor);
+ IInputChannel channel = pc.getInputChannel();
+ try {
+ channel.registerMonitor(this);
+ channel.open(ctx);
+ mpw.open();
+ while (true) {
+ if (nAvailableFrames.get() > 0) {
+ ByteBuffer buffer = channel.getNextBuffer();
+ nAvailableFrames.decrementAndGet();
+ mpw.nextFrame(buffer);
+ channel.recycleBuffer(buffer);
+ } else if (eos.get()) {
+ break;
+ } else if (failed.get()) {
+ throw new HyracksDataException("Failure occurred on input");
+ } else {
+ try {
+ synchronized (this) {
+ if (nAvailableFrames.get() <= 0 && !eos.get() && !failed.get()) {
+ wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ mpw.close();
+ channel.close();
+ delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
+ new MaterializedPartitionInputChannel(1, pid, manager))));
+ } catch (HyracksException e) {
+ }
+ }
+ }
+
+ @Override
+ public IFrameReader getReader() throws HyracksException {
+ return delegate.getReader();
+ }
+
+ @Override
+ public void close() throws HyracksException {
+ delegate.close();
+ }
+
+ @Override
+ public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
+ return delegate.getRequiredPartitionIds();
+ }
+
+ @Override
+ public void abort() {
+ delegate.abort();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
new file mode 100644
index 0000000..96fcf75
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorReceiverProfilingFrameReader.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class ConnectorReceiverProfilingFrameReader implements IFrameReader {
+ private final IFrameReader reader;
+ private final ICounter openCounter;
+ private final ICounter closeCounter;
+ private final ICounter frameCounter;
+
+ public ConnectorReceiverProfilingFrameReader(IHyracksTaskContext ctx, IFrameReader reader,
+ ConnectorDescriptorId cdId, int receiverIndex) {
+ this.reader = reader;
+ this.openCounter = ctx.getCounterContext().getCounter(cdId + ".receiver." + receiverIndex + ".open", true);
+ this.closeCounter = ctx.getCounterContext().getCounter(cdId + ".receiver." + receiverIndex + ".close", true);
+ this.frameCounter = ctx.getCounterContext()
+ .getCounter(cdId + ".receiver." + receiverIndex + ".nextFrame", true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ reader.open();
+ openCounter.update(1);
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ boolean status = reader.nextFrame(buffer);
+ if (status) {
+ frameCounter.update(1);
+ }
+ return status;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ reader.close();
+ closeCounter.update(1);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
new file mode 100644
index 0000000..be7b319
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class ConnectorSenderProfilingFrameWriter implements IFrameWriter {
+ private final IFrameWriter writer;
+ private final ICounter openCounter;
+ private final ICounter closeCounter;
+ private final ICounter frameCounter;
+
+ public ConnectorSenderProfilingFrameWriter(IHyracksTaskContext ctx, IFrameWriter writer,
+ ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
+ this.writer = writer;
+ int attempt = ctx.getTaskAttemptId().getAttempt();
+ this.openCounter = ctx.getCounterContext().getCounter(
+ cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".open", true);
+ this.closeCounter = ctx.getCounterContext().getCounter(
+ cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".close", true);
+ this.frameCounter = ctx.getCounterContext().getCounter(
+ cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".nextFrame", true);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ openCounter.update(1);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ frameCounter.update(1);
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ closeCounter.update(1);
+ writer.close();
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
new file mode 100644
index 0000000..affa01c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/profiling/ProfilingPartitionWriterFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.profiling;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.PartitionProfile;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class ProfilingPartitionWriterFactory implements IPartitionWriterFactory {
+ private static final int N_SAMPLES = 64;
+
+ private final IHyracksTaskContext ctx;
+
+ private final IConnectorDescriptor cd;
+
+ private final int senderIndex;
+
+ private final IPartitionWriterFactory delegate;
+
+ public ProfilingPartitionWriterFactory(IHyracksTaskContext ctx, IConnectorDescriptor cd, int senderIndex,
+ IPartitionWriterFactory delegate) {
+ this.ctx = ctx;
+ this.cd = cd;
+ this.senderIndex = senderIndex;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public IFrameWriter createFrameWriter(final int receiverIndex) throws HyracksDataException {
+ final IFrameWriter writer = new ConnectorSenderProfilingFrameWriter(ctx,
+ delegate.createFrameWriter(receiverIndex), cd.getConnectorId(), senderIndex, receiverIndex);
+ return new IFrameWriter() {
+ private long openTime;
+
+ private long closeTime;
+
+ MultiResolutionEventProfiler mrep = new MultiResolutionEventProfiler(N_SAMPLES);
+
+ @Override
+ public void open() throws HyracksDataException {
+ openTime = System.currentTimeMillis();
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ mrep.reportEvent();
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ closeTime = System.currentTimeMillis();
+ ((Task) ctx).setPartitionSendProfile(new PartitionProfile(new PartitionId(ctx.getJobletContext()
+ .getJobId(), cd.getConnectorId(), senderIndex, receiverIndex), openTime, closeTime, mrep));
+ writer.close();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/DefaultDeallocatableRegistry.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/DefaultDeallocatableRegistry.java
new file mode 100644
index 0000000..863ab7b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/resources/DefaultDeallocatableRegistry.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.resources;
+
+import java.util.List;
+import java.util.Vector;
+
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public class DefaultDeallocatableRegistry implements IDeallocatableRegistry {
+ private final List<IDeallocatable> deallocatables;
+
+ public DefaultDeallocatableRegistry() {
+ deallocatables = new Vector<IDeallocatable>();
+ }
+
+ @Override
+ public void registerDeallocatable(IDeallocatable deallocatable) {
+ deallocatables.add(deallocatable);
+ }
+
+ public void close() {
+ for (IDeallocatable d : deallocatables) {
+ try {
+ d.deallocate();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
new file mode 100644
index 0000000..4651149
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.runtime;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class RootHyracksContext implements IHyracksRootContext {
+ private final NodeControllerService ncs;
+
+ private final IIOManager ioManager;
+
+ public RootHyracksContext(NodeControllerService ncs, IIOManager ioManager) {
+ this.ncs = ncs;
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public IIOManager getIOManager() {
+ return ioManager;
+ }
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+ return ncs.getNodeControllersInfo();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
new file mode 100644
index 0000000..8f8c032
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class AbortTasksWork extends AbstractWork {
+ private static final Logger LOGGER = Logger.getLogger(AbortTasksWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final JobId jobId;
+
+ private final List<TaskAttemptId> tasks;
+
+ public AbortTasksWork(NodeControllerService ncs, JobId jobId, List<TaskAttemptId> tasks) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public void run() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
+ }
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(jobId);
+ if (ji != null) {
+ Map<TaskAttemptId, Task> taskMap = ji.getTaskMap();
+ for (TaskAttemptId taId : tasks) {
+ Task task = taskMap.get(taId);
+ if (task != null) {
+ task.abort();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
new file mode 100644
index 0000000..deb1b75
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.messages.IMessage;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+/**
+ * @author rico
+ *
+ */
+public class ApplicationMessageWork extends AbstractWork {
+
+ private static final Logger LOGGER = Logger.getLogger(ApplicationMessageWork.class.getName());
+ private byte[] message;
+ private String nodeId;
+ private NodeControllerService ncs;
+ private String appName;
+
+ public ApplicationMessageWork(NodeControllerService ncs, byte[] message, String appName, String nodeId) {
+ this.ncs = ncs;
+ this.nodeId = nodeId;
+ this.message = message;
+ this.appName = appName;
+ }
+
+ @Override
+ public void run() {
+
+ NCApplicationContext ctx = ncs.getApplications().get(appName);
+ try {
+ IMessage data = (IMessage) ctx.deserialize(message);
+ if (ctx.getMessageBroker() != null) {
+ ctx.getMessageBroker().receivedMessage(data, nodeId);
+ } else {
+ LOGGER.log(Level.WARNING, "Messsage was sent, but no Message Broker set!");
+ }
+ } catch (IOException e) {
+ Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+ } catch (ClassNotFoundException e) {
+ Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Error in application message delivery!", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "nodeID: " + nodeId;
+ }
+
+}
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
new file mode 100644
index 0000000..574bc6d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/BuildJobProfilesWork.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class BuildJobProfilesWork extends SynchronizableWork {
+ private final NodeControllerService ncs;
+
+ private final FutureValue<List<JobProfile>> fv;
+
+ public BuildJobProfilesWork(NodeControllerService ncs, FutureValue<List<JobProfile>> fv) {
+ this.ncs = ncs;
+ this.fv = fv;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ List<JobProfile> profiles = new ArrayList<JobProfile>();
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ for (Joblet ji : jobletMap.values()) {
+ profiles.add(new JobProfile(ji.getJobId()));
+ }
+ for (JobProfile jProfile : profiles) {
+ Joblet ji;
+ JobletProfile jobletProfile = new JobletProfile(ncs.getId());
+ ji = jobletMap.get(jProfile.getJobId());
+ if (ji != null) {
+ ji.dumpProfile(jobletProfile);
+ jProfile.getJobletProfiles().put(ncs.getId(), jobletProfile);
+ }
+ }
+ fv.setValue(profiles);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
new file mode 100644
index 0000000..173ab92
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CleanupJobletWork.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.IPartition;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class CleanupJobletWork extends AbstractWork {
+ private static final Logger LOGGER = Logger.getLogger(CleanupJobletWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final JobId jobId;
+
+ private JobStatus status;
+
+ public CleanupJobletWork(NodeControllerService ncs, JobId jobId, JobStatus status) {
+ this.ncs = ncs;
+ this.jobId = jobId;
+ this.status = status;
+ }
+
+ @Override
+ public void run() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Cleaning up after job: " + jobId);
+ }
+ final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
+ ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
+ ncs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ for (IPartition p : unregisteredPartitions) {
+ p.deallocate();
+ }
+ }
+ });
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet joblet = jobletMap.remove(jobId);
+ if (joblet != null) {
+ joblet.cleanup(status);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
new file mode 100644
index 0000000..6eb1a95
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/CreateApplicationWork.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class CreateApplicationWork extends AbstractWork {
+ private final NodeControllerService ncs;
+
+ private final String appName;
+
+ private final boolean deployHar;
+
+ private final byte[] serializedDistributedState;
+
+ public CreateApplicationWork(NodeControllerService ncs, String appName, boolean deployHar,
+ byte[] serializedDistributedState) {
+ this.ncs = ncs;
+ this.appName = appName;
+ this.deployHar = deployHar;
+ this.serializedDistributedState = serializedDistributedState;
+ }
+
+ @Override
+ public void run() {
+ try {
+ NCApplicationContext appCtx;
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ if (applications.containsKey(appName)) {
+ throw new HyracksException("Duplicate application with name: " + appName + " being created.");
+ }
+ appCtx = new NCApplicationContext(ncs.getServerContext(), ncs.getRootContext(), appName, ncs.getId());
+ applications.put(appName, appCtx);
+ if (deployHar) {
+ NCConfig ncConfig = ncs.getConfiguration();
+ NodeParameters nodeParameters = ncs.getNodeParameters();
+ HttpClient hc = new DefaultHttpClient();
+ HttpGet get = new HttpGet("http://" + ncConfig.ccHost + ":"
+ + nodeParameters.getClusterControllerInfo().getWebPort() + "/applications/" + appName);
+ HttpResponse response = hc.execute(get);
+ InputStream is = response.getEntity().getContent();
+ OutputStream os = appCtx.getHarOutputStream();
+ try {
+ IOUtils.copyLarge(is, os);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
+ appCtx.initializeClassPath();
+ appCtx.setDistributedState((Serializable) appCtx.deserialize(serializedDistributedState));
+ appCtx.initialize();
+ ncs.getClusterController()
+ .notifyApplicationStateChange(ncs.getId(), appName, ApplicationStatus.INITIALIZED);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
new file mode 100644
index 0000000..b104ce8
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/DestroyApplicationWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+
+public class DestroyApplicationWork extends AbstractWork {
+ private final NodeControllerService ncs;
+
+ private final String appName;
+
+ public DestroyApplicationWork(NodeControllerService ncs, String appName) {
+ this.ncs = ncs;
+ this.appName = appName;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ ApplicationContext appCtx = applications.remove(appName);
+ if (appCtx != null) {
+ appCtx.deinitialize();
+ }
+ ncs.getClusterController().notifyApplicationStateChange(ncs.getId(), appName,
+ ApplicationStatus.DEINITIALIZED);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
new file mode 100644
index 0000000..022c92f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskCompleteWork extends AbstractWork {
+ private final NodeControllerService ncs;
+ private final Task task;
+
+ public NotifyTaskCompleteWork(NodeControllerService ncs, Task task) {
+ this.ncs = ncs;
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile());
+ task.dumpProfile(taskProfile);
+ try {
+ ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+ ncs.getId(), taskProfile);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ task.getJoblet().removeTask(task);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
new file mode 100644
index 0000000..3957934
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+
+public class NotifyTaskFailureWork extends AbstractWork {
+ private final NodeControllerService ncs;
+ private final Task task;
+ private final String details;
+
+ public NotifyTaskFailureWork(NodeControllerService ncs, Task task, String details) {
+ this.ncs = ncs;
+ this.task = task;
+ this.details = details;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
+ ncs.getId(), details);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ task.getJoblet().removeTask(task);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
new file mode 100644
index 0000000..bb9669d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
+
+public class ReportPartitionAvailabilityWork extends AbstractWork {
+ private final NodeControllerService ncs;
+
+ private final PartitionId pid;
+
+ private final NetworkAddress networkAddress;
+
+ public ReportPartitionAvailabilityWork(NodeControllerService ncs, PartitionId pid, NetworkAddress networkAddress) {
+ this.ncs = ncs;
+ this.pid = pid;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(pid.getJobId());
+ if (ji != null) {
+ PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getNetworkManager(),
+ new InetSocketAddress(InetAddress.getByAddress(networkAddress.getIpAddress()),
+ networkAddress.getPort()), pid, 5));
+ ji.reportPartitionAvailability(channel);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
new file mode 100644
index 0000000..0c0fa3d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.nc.Joblet;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.Task;
+import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
+import edu.uci.ics.hyracks.control.nc.profiling.ProfilingPartitionWriterFactory;
+
+public class StartTasksWork extends AbstractWork {
+ private static final Logger LOGGER = Logger.getLogger(StartTasksWork.class.getName());
+
+ private final NodeControllerService ncs;
+
+ private final String appName;
+
+ private final JobId jobId;
+
+ private final byte[] acgBytes;
+
+ private final List<TaskAttemptDescriptor> taskDescriptors;
+
+ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap;
+
+ private final EnumSet<JobFlag> flags;
+
+ public StartTasksWork(NodeControllerService ncs, String appName, JobId jobId, byte[] acgBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, EnumSet<JobFlag> flags) {
+ this.ncs = ncs;
+ this.appName = appName;
+ this.jobId = jobId;
+ this.acgBytes = acgBytes;
+ this.taskDescriptors = taskDescriptors;
+ this.connectorPoliciesMap = connectorPoliciesMap;
+ this.flags = flags;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Map<String, NCApplicationContext> applications = ncs.getApplications();
+ NCApplicationContext appCtx = applications.get(appName);
+ final Joblet joblet = getOrCreateLocalJoblet(jobId, appCtx, acgBytes == null ? null
+ : (ActivityClusterGraph) appCtx.deserialize(acgBytes));
+ final ActivityClusterGraph acg = joblet.getActivityClusterGraph();
+
+ IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
+ @Override
+ public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
+ ActivityCluster ac = acg.getActivityMap().get(aid);
+ IConnectorDescriptor conn = ac.getActivityOutputMap().get(aid).get(outputIndex);
+ return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ }
+
+ @Override
+ public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
+ ActivityCluster ac = acg.getActivityMap().get(aid);
+ IConnectorDescriptor conn = ac.getActivityInputMap().get(aid).get(inputIndex);
+ return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ }
+ };
+
+ for (TaskAttemptDescriptor td : taskDescriptors) {
+ TaskAttemptId taId = td.getTaskAttemptId();
+ TaskId tid = taId.getTaskId();
+ ActivityId aid = tid.getActivityId();
+ ActivityCluster ac = acg.getActivityMap().get(aid);
+ IActivity han = ac.getActivityMap().get(aid);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Initializing " + taId + " -> " + han);
+ }
+ final int partition = tid.getPartition();
+ Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs);
+ IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
+
+ List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
+
+ List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
+ if (inputs != null) {
+ for (int i = 0; i < inputs.size(); ++i) {
+ IConnectorDescriptor conn = inputs.get(i);
+ IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("input: " + i + ": " + conn.getConnectorId());
+ }
+ RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
+ recordDesc, cPolicy);
+ collectors.add(collector);
+ }
+ }
+ List<IConnectorDescriptor> outputs = ac.getActivityOutputMap().get(aid);
+ if (outputs != null) {
+ for (int i = 0; i < outputs.size(); ++i) {
+ final IConnectorDescriptor conn = outputs.get(i);
+ RecordDescriptor recordDesc = ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+
+ IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
+ partition, taId, flags);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("output: " + i + ": " + conn.getConnectorId());
+ }
+ IFrameWriter writer = conn.createPartitioner(task, recordDesc, pwFactory, partition,
+ td.getPartitionCount(), td.getOutputPartitionCounts()[i]);
+ operator.setOutputFrameWriter(i, writer, recordDesc);
+ }
+ }
+
+ task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
+ joblet.addTask(task);
+
+ task.start();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Joblet getOrCreateLocalJoblet(JobId jobId, INCApplicationContext appCtx, ActivityClusterGraph acg)
+ throws Exception {
+ Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
+ Joblet ji = jobletMap.get(jobId);
+ if (ji == null) {
+ if (acg == null) {
+ throw new NullPointerException("JobActivityGraph was null");
+ }
+ ji = new Joblet(ncs, jobId, appCtx, acg);
+ jobletMap.put(jobId, ji);
+ }
+ return ji;
+ }
+
+ private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
+ int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
+ throws HyracksDataException {
+ IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+ td.getInputPartitionCounts()[i], td.getPartitionCount());
+ if (cPolicy.materializeOnReceiveSide()) {
+ return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
+ task.getTaskAttemptId(), ncs.getExecutor());
+ } else {
+ return collector;
+ }
+ }
+
+ private IPartitionWriterFactory createPartitionWriterFactory(final IHyracksTaskContext ctx,
+ IConnectorPolicy cPolicy, final JobId jobId, final IConnectorDescriptor conn, final int senderIndex,
+ final TaskAttemptId taId, EnumSet<JobFlag> flags) {
+ IPartitionWriterFactory factory;
+ if (cPolicy.materializeOnSendSide()) {
+ if (cPolicy.consumerWaitsForProducerToFinish()) {
+ factory = new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
+ conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+ }
+ };
+ } else {
+ factory = new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
+ jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+ }
+ };
+ }
+ } else {
+ factory = new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
+ conn.getConnectorId(), senderIndex, receiverIndex), taId);
+ }
+ };
+ }
+ if (flags.contains(JobFlag.PROFILE_RUNTIME)) {
+ factory = new ProfilingPartitionWriterFactory(ctx, conn, senderIndex, factory);
+ }
+ return factory;
+ }
+}
\ No newline at end of file