Merged hyracks_dev_next -r 1287 into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@1288 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks/hyracks-control/hyracks-control-common/pom.xml
new file mode 100644
index 0000000..c9b5e93
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -0,0 +1,42 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control-common</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-control</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.0-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>1.4</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
new file mode 100644
index 0000000..21e3e9d
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/AbstractRemoteService.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common;
+
+import edu.uci.ics.hyracks.control.common.service.IService;
+
+public abstract class AbstractRemoteService implements IService {
+ public AbstractRemoteService() {
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
new file mode 100644
index 0000000..9f29fbd
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationContext.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.application;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Properties;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+
+import edu.uci.ics.hyracks.api.application.IApplicationContext;
+import edu.uci.ics.hyracks.api.application.IBootstrap;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.control.common.context.ServerContext;
+
+public abstract class ApplicationContext implements IApplicationContext {
+ private static final String APPLICATION_ROOT = "applications";
+ private static final String CLUSTER_CONTROLLER_BOOTSTRAP_CLASS_KEY = "cc.bootstrap.class";
+ private static final String NODE_CONTROLLER_BOOTSTRAP_CLASS_KEY = "nc.bootstrap.class";
+
+ protected ServerContext serverCtx;
+ protected final String appName;
+ protected final File applicationRootDir;
+ protected ClassLoader classLoader;
+ protected ApplicationStatus status;
+ protected Properties deploymentDescriptor;
+ protected IBootstrap bootstrap;
+ protected Serializable distributedState;
+
+ public ApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+ this.serverCtx = serverCtx;
+ this.appName = appName;
+ this.applicationRootDir = new File(new File(serverCtx.getBaseDir(), APPLICATION_ROOT), appName);
+ FileUtils.deleteDirectory(applicationRootDir);
+ applicationRootDir.mkdirs();
+ }
+
+ public String getApplicationName() {
+ return appName;
+ }
+
+ public void initializeClassPath() throws Exception {
+ if (expandArchive()) {
+ File expandedFolder = getExpandedFolder();
+ List<URL> urls = new ArrayList<URL>();
+ findJarFiles(expandedFolder, urls);
+ Collections.sort(urls, new Comparator<URL>() {
+ @Override
+ public int compare(URL o1, URL o2) {
+ return o1.getFile().compareTo(o2.getFile());
+ }
+ });
+ classLoader = new URLClassLoader(urls.toArray(new URL[urls.size()]));
+ } else {
+ classLoader = getClass().getClassLoader();
+ }
+ deploymentDescriptor = parseDeploymentDescriptor();
+ }
+
+ public void initialize() throws Exception {
+ if (deploymentDescriptor != null) {
+ String bootstrapClass = null;
+ switch (serverCtx.getServerType()) {
+ case CLUSTER_CONTROLLER: {
+ bootstrapClass = deploymentDescriptor.getProperty(CLUSTER_CONTROLLER_BOOTSTRAP_CLASS_KEY);
+ break;
+ }
+ case NODE_CONTROLLER: {
+ bootstrapClass = deploymentDescriptor.getProperty(NODE_CONTROLLER_BOOTSTRAP_CLASS_KEY);
+ break;
+ }
+ }
+ if (bootstrapClass != null) {
+ bootstrap = (IBootstrap) classLoader.loadClass(bootstrapClass).newInstance();
+ start();
+ }
+ }
+ }
+
+ protected abstract void start() throws Exception;
+
+ protected abstract void stop() throws Exception;
+
+ private void findJarFiles(File dir, List<URL> urls) throws MalformedURLException {
+ for (File f : dir.listFiles()) {
+ if (f.isDirectory()) {
+ findJarFiles(f, urls);
+ } else if (f.getName().endsWith(".jar") || f.getName().endsWith(".zip")) {
+ urls.add(f.toURI().toURL());
+ }
+ }
+ }
+
+ private Properties parseDeploymentDescriptor() throws IOException {
+ InputStream in = classLoader.getResourceAsStream("hyracks-deployment.properties");
+ Properties props = new Properties();
+ if (in != null) {
+ try {
+ props.load(in);
+ } finally {
+ in.close();
+ }
+ }
+ return props;
+ }
+
+ private boolean expandArchive() throws IOException {
+ File archiveFile = getArchiveFile();
+ if (archiveFile.exists()) {
+ File expandedFolder = getExpandedFolder();
+ FileUtils.deleteDirectory(expandedFolder);
+ ZipFile zf = new ZipFile(archiveFile);
+ for (Enumeration<? extends ZipEntry> i = zf.entries(); i.hasMoreElements();) {
+ ZipEntry ze = i.nextElement();
+ String name = ze.getName();
+ if (name.endsWith("/")) {
+ continue;
+ }
+ InputStream is = zf.getInputStream(ze);
+ OutputStream os = FileUtils.openOutputStream(new File(expandedFolder, name));
+ try {
+ IOUtils.copyLarge(is, os);
+ } finally {
+ os.close();
+ is.close();
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private File getExpandedFolder() {
+ return new File(applicationRootDir, "expanded");
+ }
+
+ public void deinitialize() throws Exception {
+ stop();
+ File expandedFolder = getExpandedFolder();
+ FileUtils.deleteDirectory(expandedFolder);
+ }
+
+ public Object deserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+ return JavaSerializationUtils.deserialize(bytes, classLoader);
+ }
+
+ public OutputStream getHarOutputStream() throws IOException {
+ return new FileOutputStream(getArchiveFile());
+ }
+
+ private File getArchiveFile() {
+ return new File(applicationRootDir, "application.har");
+ }
+
+ public InputStream getHarInputStream() throws IOException {
+ return new FileInputStream(getArchiveFile());
+ }
+
+ public boolean containsHar() {
+ return getArchiveFile().exists();
+ }
+
+ @Override
+ public Serializable getDistributedState() {
+ return distributedState;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ public void setStatus(ApplicationStatus status) {
+ this.status = status;
+ }
+
+ public ApplicationStatus getStatus() {
+ return status;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
new file mode 100644
index 0000000..b45c4f1
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/application/ApplicationStatus.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.application;
+
+public enum ApplicationStatus {
+ CREATED,
+ IN_INITIALIZATION,
+ INITIALIZED,
+ IN_DEINITIALIZATION,
+ DEINITIALIZED
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
new file mode 100644
index 0000000..a03f5c9
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.base;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+
+public interface IClusterController {
+ public void registerNode(NodeRegistration reg) throws Exception;
+
+ public void unregisterNode(String nodeId) throws Exception;
+
+ public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+ throws Exception;
+
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception;
+
+ public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception;
+
+ public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
+
+ public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
+
+ public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception;
+
+ public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
+
+ public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
new file mode 100644
index 0000000..47f4ceb
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.base;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+
+public interface INodeController {
+ public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception;
+
+ public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception;
+
+ public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception;
+
+ public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+ throws Exception;
+
+ public void destroyApplication(String appName) throws Exception;
+
+ public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/context/ServerContext.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/context/ServerContext.java
new file mode 100644
index 0000000..7eca794
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/context/ServerContext.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.context;
+
+import java.io.File;
+
+public class ServerContext {
+ public enum ServerType {
+ CLUSTER_CONTROLLER,
+ NODE_CONTROLLER,
+ }
+
+ private final ServerType type;
+ private final File baseDir;
+
+ public ServerContext(ServerType type, File baseDir) throws Exception {
+ this.type = type;
+ this.baseDir = baseDir;
+ }
+
+ public ServerType getServerType() {
+ return type;
+ }
+
+ public File getBaseDir() {
+ return baseDir;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
new file mode 100644
index 0000000..350f527
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/CCConfig.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.controllers;
+
+import java.util.List;
+
+import org.kohsuke.args4j.Option;
+
+public class CCConfig {
+ @Option(name = "-client-net-ip-address", usage = "Sets the IP Address to listen for connections from clients", required = true)
+ public String clientNetIpAddress;
+
+ @Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)")
+ public int clientNetPort = 1098;
+
+ @Option(name = "-cluster-net-ip-address", usage = "Sets the IP Address to listen for connections from ", required = true)
+ public String clusterNetIpAddress;
+
+ @Option(name = "-cluster-net-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
+ public int clusterNetPort = 1099;
+
+ @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 19001)")
+ public int httpPort = 16001;
+
+ @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
+ public int heartbeatPeriod = 10000;
+
+ @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
+ public int maxHeartbeatLapsePeriods = 5;
+
+ @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)")
+ public int profileDumpPeriod = 0;
+
+ @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not specified in the job specification. (default: 5)")
+ public int defaultMaxJobAttempts = 5;
+
+ @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to the specified value. (default: 10)")
+ public int jobHistorySize = 10;
+
+ @Option(name = "-cc-root", usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
+ public String ccRoot = "ClusterControllerService";
+
+ public void toCommandLine(List<String> cList) {
+ cList.add("-client-net-ip-address");
+ cList.add(clientNetIpAddress);
+ cList.add("-client-net-port");
+ cList.add(String.valueOf(clientNetPort));
+ cList.add("-cluster-net-ip-address");
+ cList.add(clusterNetIpAddress);
+ cList.add("-cluster-net-port");
+ cList.add(String.valueOf(clusterNetPort));
+ cList.add("-http-port");
+ cList.add(String.valueOf(httpPort));
+ cList.add("-heartbeat-period");
+ cList.add(String.valueOf(heartbeatPeriod));
+ cList.add("-max-heartbeat-lapse-periods");
+ cList.add(String.valueOf(maxHeartbeatLapsePeriods));
+ cList.add("-profile-dump-period");
+ cList.add(String.valueOf(profileDumpPeriod));
+ cList.add("-default-max-job-attempts");
+ cList.add(String.valueOf(defaultMaxJobAttempts));
+ cList.add("-job-history-size");
+ cList.add(String.valueOf(jobHistorySize));
+ cList.add("-cc-root");
+ cList.add(ccRoot);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
new file mode 100644
index 0000000..574f552
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.controllers;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.kohsuke.args4j.Option;
+
+public class NCConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
+ public String ccHost;
+
+ @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)")
+ public int ccPort = 1099;
+
+ @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
+ public String clusterNetIPAddress;
+
+ @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
+ public String nodeId;
+
+ @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
+ public String dataIPAddress;
+
+ @Option(name = "-frame-size", usage = "Frame Size to use for data communication (default: 32768)")
+ public int frameSize = 32768;
+
+ @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
+ public String ioDevices = System.getProperty("java.io.tmpdir");
+
+ @Option(name = "-dcache-client-servers", usage = "Sets the list of DCache servers in the format host1:port1,host2:port2,... (default localhost:54583)")
+ public String dcacheClientServers = "localhost:54583";
+
+ @Option(name = "-dcache-client-server-local", usage = "Sets the local DCache server, if one is available, in the format host:port (default not set)")
+ public String dcacheClientServerLocal;
+
+ @Option(name = "-dcache-client-path", usage = "Sets the path to store the files retrieved from the DCache server (default /tmp/dcache-client)")
+ public String dcacheClientPath = "/tmp/dcache-client";
+
+ @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
+ public int nNetThreads = 1;
+
+ public void toCommandLine(List<String> cList) {
+ cList.add("-cc-host");
+ cList.add(ccHost);
+ cList.add("-cc-port");
+ cList.add(String.valueOf(ccPort));
+ cList.add("-cluster-net-ip-address");
+ cList.add(clusterNetIPAddress);
+ cList.add("-node-id");
+ cList.add(nodeId);
+ cList.add("-data-ip-address");
+ cList.add(dataIPAddress);
+ cList.add("-frame-size");
+ cList.add(String.valueOf(frameSize));
+ cList.add("-iodevices");
+ cList.add(ioDevices);
+ cList.add("-dcache-client-servers");
+ cList.add(dcacheClientServers);
+ if (dcacheClientServerLocal != null) {
+ cList.add("-dcache-client-server-local");
+ cList.add(dcacheClientServerLocal);
+ }
+ cList.add("-dcache-client-path");
+ cList.add(dcacheClientPath);
+ cList.add("-net-thread-count");
+ cList.add(String.valueOf(nNetThreads));
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
new file mode 100644
index 0000000..0161f96
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeParameters.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.controllers;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
+
+public class NodeParameters implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private ClusterControllerInfo ccInfo;
+
+ private int heartbeatPeriod;
+
+ private int profileDumpPeriod;
+
+ public ClusterControllerInfo getClusterControllerInfo() {
+ return ccInfo;
+ }
+
+ public void setClusterControllerInfo(ClusterControllerInfo ccInfo) {
+ this.ccInfo = ccInfo;
+ }
+
+ public int getHeartbeatPeriod() {
+ return heartbeatPeriod;
+ }
+
+ public void setHeartbeatPeriod(int heartbeatPeriod) {
+ this.heartbeatPeriod = heartbeatPeriod;
+ }
+
+ public int getProfileDumpPeriod() {
+ return profileDumpPeriod;
+ }
+
+ public void setProfileDumpPeriod(int profileDumpPeriod) {
+ this.profileDumpPeriod = profileDumpPeriod;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
new file mode 100644
index 0000000..91cfecf
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.controllers;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
+
+public final class NodeRegistration implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final InetSocketAddress ncAddress;
+
+ private final String nodeId;
+
+ private final NCConfig ncConfig;
+
+ private final NetworkAddress dataPort;
+
+ private final String osName;
+
+ private final String arch;
+
+ private final String osVersion;
+
+ private final int nProcessors;
+
+ private final String vmName;
+
+ private final String vmVersion;
+
+ private final String vmVendor;
+
+ private final String classpath;
+
+ private final String libraryPath;
+
+ private final String bootClasspath;
+
+ private final List<String> inputArguments;
+
+ private final Map<String, String> systemProperties;
+
+ private final HeartbeatSchema hbSchema;
+
+ public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
+ String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion,
+ String vmVendor, String classpath, String libraryPath, String bootClasspath, List<String> inputArguments,
+ Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+ this.ncAddress = ncAddress;
+ this.nodeId = nodeId;
+ this.ncConfig = ncConfig;
+ this.dataPort = dataPort;
+ this.osName = osName;
+ this.arch = arch;
+ this.osVersion = osVersion;
+ this.nProcessors = nProcessors;
+ this.vmName = vmName;
+ this.vmVersion = vmVersion;
+ this.vmVendor = vmVendor;
+ this.classpath = classpath;
+ this.libraryPath = libraryPath;
+ this.bootClasspath = bootClasspath;
+ this.inputArguments = inputArguments;
+ this.systemProperties = systemProperties;
+ this.hbSchema = hbSchema;
+ }
+
+ public InetSocketAddress getNodeControllerAddress() {
+ return ncAddress;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public NCConfig getNCConfig() {
+ return ncConfig;
+ }
+
+ public NetworkAddress getDataPort() {
+ return dataPort;
+ }
+
+ public String getOSName() {
+ return osName;
+ }
+
+ public String getArch() {
+ return arch;
+ }
+
+ public String getOSVersion() {
+ return osVersion;
+ }
+
+ public int getNProcessors() {
+ return nProcessors;
+ }
+
+ public HeartbeatSchema getHeartbeatSchema() {
+ return hbSchema;
+ }
+
+ public String getVmName() {
+ return vmName;
+ }
+
+ public String getVmVersion() {
+ return vmVersion;
+ }
+
+ public String getVmVendor() {
+ return vmVendor;
+ }
+
+ public String getClasspath() {
+ return classpath;
+ }
+
+ public String getLibraryPath() {
+ return libraryPath;
+ }
+
+ public String getBootClasspath() {
+ return bootClasspath;
+ }
+
+ public List<String> getInputArguments() {
+ return inputArguments;
+ }
+
+ public Map<String, String> getSystemProperties() {
+ return systemProperties;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
new file mode 100644
index 0000000..1dba3bc
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.heartbeat;
+
+import java.io.Serializable;
+
+public class HeartbeatData implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public long heapInitSize;
+ public long heapUsedSize;
+ public long heapCommittedSize;
+ public long heapMaxSize;
+ public long nonheapInitSize;
+ public long nonheapUsedSize;
+ public long nonheapCommittedSize;
+ public long nonheapMaxSize;
+ public int threadCount;
+ public int peakThreadCount;
+ public long totalStartedThreadCount;
+ public double systemLoadAverage;
+ public long[] gcCollectionCounts;
+ public long[] gcCollectionTimes;
+ public long netPayloadBytesRead;
+ public long netPayloadBytesWritten;
+ public long netSignalingBytesRead;
+ public long netSignalingBytesWritten;
+ public long ipcMessagesSent;
+ public long ipcMessageBytesSent;
+ public long ipcMessagesReceived;
+ public long ipcMessageBytesReceived;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatSchema.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatSchema.java
new file mode 100644
index 0000000..e53e9a8
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatSchema.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.heartbeat;
+
+import java.io.Serializable;
+
+public class HeartbeatSchema implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final GarbageCollectorInfo[] gcInfos;
+
+ public HeartbeatSchema(GarbageCollectorInfo[] gcInfos) {
+ this.gcInfos = gcInfos;
+ }
+
+ public GarbageCollectorInfo[] getGarbageCollectorInfos() {
+ return gcInfos;
+ }
+
+ public static class GarbageCollectorInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String name;
+
+ public GarbageCollectorInfo(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
new file mode 100644
index 0000000..9ea4636
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -0,0 +1,792 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.PartitionState;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IPayloadSerializerDeserializer;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+public class CCNCFunctions {
+ private static final int FID_CODE_SIZE = 1;
+
+ public enum FunctionId {
+ REGISTER_NODE,
+ UNREGISTER_NODE,
+ NOTIFY_JOBLET_CLEANUP,
+ NOTIFY_TASK_COMPLETE,
+ NOTIFY_TASK_FAILURE,
+ NODE_HEARTBEAT,
+ REPORT_PROFILE,
+ REGISTER_PARTITION_PROVIDER,
+ REGISTER_PARTITION_REQUEST,
+ APPLICATION_STATE_CHANGE_RESPONSE,
+
+ NODE_REGISTRATION_RESULT,
+ START_TASKS,
+ ABORT_TASKS,
+ CLEANUP_JOBLET,
+ CREATE_APPLICATION,
+ DESTROY_APPLICATION,
+ REPORT_PARTITION_AVAILABILITY,
+
+ OTHER
+ }
+
+ public static abstract class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class RegisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeRegistration reg;
+
+ public RegisterNodeFunction(NodeRegistration reg) {
+ this.reg = reg;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_NODE;
+ }
+
+ public NodeRegistration getNodeRegistration() {
+ return reg;
+ }
+ }
+
+ public static class UnregisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ public UnregisterNodeFunction(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.UNREGISTER_NODE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NotifyTaskCompleteFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final TaskProfile statistics;
+
+ public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_COMPLETE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public TaskProfile getStatistics() {
+ return statistics;
+ }
+ }
+
+ public static class NotifyTaskFailureFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final String details;
+
+ public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.details = details;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_FAILURE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getDetails() {
+ return details;
+ }
+ }
+
+ public static class NotifyJobletCleanupFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final String nodeId;
+
+ public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_JOBLET_CLEANUP;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NodeHeartbeatFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final HeartbeatData hbData;
+
+ public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+ this.nodeId = nodeId;
+ this.hbData = hbData;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_HEARTBEAT;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public HeartbeatData getHeartbeatData() {
+ return hbData;
+ }
+ }
+
+ public static class ReportProfileFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final List<JobProfile> profiles;
+
+ public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+ this.nodeId = nodeId;
+ this.profiles = profiles;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PROFILE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public List<JobProfile> getProfiles() {
+ return profiles;
+ }
+ }
+
+ public static class RegisterPartitionProviderFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionDescriptor partitionDescriptor;
+
+ public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+ this.partitionDescriptor = partitionDescriptor;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_PROVIDER;
+ }
+
+ public PartitionDescriptor getPartitionDescriptor() {
+ return partitionDescriptor;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ // Read PartitionId
+ PartitionId pid = readPartitionId(dis);
+
+ // Read nodeId
+ String nodeId = dis.readUTF();
+
+ // Read TaskAttemptId
+ TaskAttemptId taId = readTaskAttemptId(dis);
+
+ // Read reusable flag
+ boolean reusable = dis.readBoolean();
+
+ // Read Partition State
+ PartitionState state = readPartitionState(dis);
+
+ PartitionDescriptor pd = new PartitionDescriptor(pid, nodeId, taId, reusable);
+ pd.setState(state);
+ return new RegisterPartitionProviderFunction(pd);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ RegisterPartitionProviderFunction fn = (RegisterPartitionProviderFunction) object;
+
+ DataOutputStream dos = new DataOutputStream(out);
+
+ PartitionDescriptor pd = fn.getPartitionDescriptor();
+
+ // Write PartitionId
+ writePartitionId(dos, pd.getPartitionId());
+
+ // Write nodeId
+ dos.writeUTF(pd.getNodeId());
+
+ // Write TaskAttemptId
+ writeTaskAttemptId(dos, pd.getProducingTaskAttemptId());
+
+ // Write reusable flag
+ dos.writeBoolean(pd.isReusable());
+
+ // Write Partition State
+ writePartitionState(dos, pd.getState());
+ }
+ }
+
+ public static class RegisterPartitionRequestFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionRequest partitionRequest;
+
+ public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+ this.partitionRequest = partitionRequest;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_REQUEST;
+ }
+
+ public PartitionRequest getPartitionRequest() {
+ return partitionRequest;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ // Read PartitionId
+ PartitionId pid = readPartitionId(dis);
+
+ // Read nodeId
+ String nodeId = dis.readUTF();
+
+ // Read TaskAttemptId
+ TaskAttemptId taId = readTaskAttemptId(dis);
+
+ // Read Partition State
+ PartitionState state = readPartitionState(dis);
+
+ PartitionRequest pr = new PartitionRequest(pid, nodeId, taId, state);
+ return new RegisterPartitionRequestFunction(pr);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ RegisterPartitionRequestFunction fn = (RegisterPartitionRequestFunction) object;
+
+ DataOutputStream dos = new DataOutputStream(out);
+
+ PartitionRequest pr = fn.getPartitionRequest();
+
+ // Write PartitionId
+ writePartitionId(dos, pr.getPartitionId());
+
+ // Write nodeId
+ dos.writeUTF(pr.getNodeId());
+
+ // Write TaskAttemptId
+ writeTaskAttemptId(dos, pr.getRequestingTaskAttemptId());
+
+ // Write Partition State
+ writePartitionState(dos, pr.getMinimumState());
+ }
+ }
+
+ public static class ApplicationStateChangeResponseFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final String appName;
+ private final ApplicationStatus status;
+
+ public ApplicationStateChangeResponseFunction(String nodeId, String appName, ApplicationStatus status) {
+ this.nodeId = nodeId;
+ this.appName = appName;
+ this.status = status;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.APPLICATION_STATE_CHANGE_RESPONSE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getApplicationName() {
+ return appName;
+ }
+
+ public ApplicationStatus getStatus() {
+ return status;
+ }
+ }
+
+ public static class NodeRegistrationResult extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeParameters params;
+
+ private final Exception exception;
+
+ public NodeRegistrationResult(NodeParameters params, Exception exception) {
+ this.params = params;
+ this.exception = exception;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_REGISTRATION_RESULT;
+ }
+
+ public NodeParameters getNodeParameters() {
+ return params;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+ }
+
+ public static class StartTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final JobId jobId;
+ private final byte[] planBytes;
+ private final List<TaskAttemptDescriptor> taskDescriptors;
+ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
+ public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+ this.appName = appName;
+ this.jobId = jobId;
+ this.planBytes = planBytes;
+ this.taskDescriptors = taskDescriptors;
+ this.connectorPolicies = connectorPolicies;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_TASKS;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public byte[] getPlanBytes() {
+ return planBytes;
+ }
+
+ public List<TaskAttemptDescriptor> getTaskDescriptors() {
+ return taskDescriptors;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+ return connectorPolicies;
+ }
+ }
+
+ public static class AbortTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final List<TaskAttemptId> tasks;
+
+ public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+ this.jobId = jobId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.ABORT_TASKS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public List<TaskAttemptId> getTasks() {
+ return tasks;
+ }
+ }
+
+ public static class CleanupJobletFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final JobStatus status;
+
+ public CleanupJobletFunction(JobId jobId, JobStatus status) {
+ this.jobId = jobId;
+ this.status = status;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CLEANUP_JOBLET;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+ }
+
+ public static class CreateApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final boolean deployHar;
+ private final byte[] serializedDistributedState;
+
+ public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+ this.appName = appName;
+ this.deployHar = deployHar;
+ this.serializedDistributedState = serializedDistributedState;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public boolean isDeployHar() {
+ return deployHar;
+ }
+
+ public byte[] getSerializedDistributedState() {
+ return serializedDistributedState;
+ }
+ }
+
+ public static class DestroyApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public DestroyApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class ReportPartitionAvailabilityFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+ private final NetworkAddress networkAddress;
+
+ public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+ this.pid = pid;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PARTITION_AVAILABILITY;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+
+ public static Object deserialize(ByteBuffer buffer, int length) throws Exception {
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length);
+ DataInputStream dis = new DataInputStream(bais);
+
+ // Read PartitionId
+ PartitionId pid = readPartitionId(dis);
+
+ // Read NetworkAddress
+ NetworkAddress networkAddress = readNetworkAddress(dis);
+
+ return new ReportPartitionAvailabilityFunction(pid, networkAddress);
+ }
+
+ public static void serialize(OutputStream out, Object object) throws Exception {
+ ReportPartitionAvailabilityFunction fn = (ReportPartitionAvailabilityFunction) object;
+
+ DataOutputStream dos = new DataOutputStream(out);
+
+ // Write PartitionId
+ writePartitionId(dos, fn.getPartitionId());
+
+ // Write NetworkAddress
+ writeNetworkAddress(dos, fn.getNetworkAddress());
+ }
+ }
+
+ public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
+ private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
+
+ public SerializerDeserializer() {
+ javaSerde = new JavaSerializationBasedPayloadSerializerDeserializer();
+ }
+
+ @Override
+ public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+ if (length < FID_CODE_SIZE) {
+ throw new IllegalStateException("Message size too small: " + length);
+ }
+ byte fid = buffer.get();
+ return deserialize(fid, buffer, length - FID_CODE_SIZE);
+ }
+
+ @Override
+ public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+ if (length < FID_CODE_SIZE) {
+ throw new IllegalStateException("Message size too small: " + length);
+ }
+ byte fid = buffer.get();
+ if (fid != FunctionId.OTHER.ordinal()) {
+ throw new IllegalStateException("Expected FID for OTHER, found: " + fid);
+ }
+ return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE);
+ }
+
+ @Override
+ public byte[] serializeObject(Object object) throws Exception {
+ if (object instanceof Function) {
+ Function fn = (Function) object;
+ return serialize(object, (byte) fn.getFunctionId().ordinal());
+ } else {
+ return serialize(object, (byte) FunctionId.OTHER.ordinal());
+ }
+ }
+
+ @Override
+ public byte[] serializeException(Exception object) throws Exception {
+ return serialize(object, (byte) FunctionId.OTHER.ordinal());
+ }
+
+ private byte[] serialize(Object object, byte fid) throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(fid);
+ serialize(baos, object, fid);
+ JavaSerializationBasedPayloadSerializerDeserializer.serialize(baos, object);
+ baos.close();
+ return baos.toByteArray();
+ }
+
+ private void serialize(OutputStream out, Object object, byte fid) throws Exception {
+ switch (FunctionId.values()[fid]) {
+ case REGISTER_PARTITION_PROVIDER:
+ RegisterPartitionProviderFunction.serialize(out, object);
+ return;
+
+ case REGISTER_PARTITION_REQUEST:
+ RegisterPartitionRequestFunction.serialize(out, object);
+ return;
+
+ case REPORT_PARTITION_AVAILABILITY:
+ ReportPartitionAvailabilityFunction.serialize(out, object);
+ return;
+ }
+ JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
+ }
+
+ private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception {
+ switch (FunctionId.values()[fid]) {
+ case REGISTER_PARTITION_PROVIDER:
+ return RegisterPartitionProviderFunction.deserialize(buffer, length);
+
+ case REGISTER_PARTITION_REQUEST:
+ return RegisterPartitionRequestFunction.deserialize(buffer, length);
+
+ case REPORT_PARTITION_AVAILABILITY:
+ return ReportPartitionAvailabilityFunction.deserialize(buffer, length);
+ }
+
+ return javaSerde.deserializeObject(buffer, length);
+ }
+ }
+
+ private static PartitionId readPartitionId(DataInputStream dis) throws IOException {
+ long jobId = dis.readLong();
+ int cdid = dis.readInt();
+ int senderIndex = dis.readInt();
+ int receiverIndex = dis.readInt();
+ PartitionId pid = new PartitionId(new JobId(jobId), new ConnectorDescriptorId(cdid), senderIndex, receiverIndex);
+ return pid;
+ }
+
+ private static void writePartitionId(DataOutputStream dos, PartitionId pid) throws IOException {
+ dos.writeLong(pid.getJobId().getId());
+ dos.writeInt(pid.getConnectorDescriptorId().getId());
+ dos.writeInt(pid.getSenderIndex());
+ dos.writeInt(pid.getReceiverIndex());
+ }
+
+ private static TaskAttemptId readTaskAttemptId(DataInputStream dis) throws IOException {
+ int odid = dis.readInt();
+ int aid = dis.readInt();
+ int partition = dis.readInt();
+ int attempt = dis.readInt();
+ TaskAttemptId taId = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(odid), aid),
+ partition), attempt);
+ return taId;
+ }
+
+ private static void writeTaskAttemptId(DataOutputStream dos, TaskAttemptId taId) throws IOException {
+ TaskId tid = taId.getTaskId();
+ ActivityId aid = tid.getActivityId();
+ OperatorDescriptorId odId = aid.getOperatorDescriptorId();
+ dos.writeInt(odId.getId());
+ dos.writeInt(aid.getLocalId());
+ dos.writeInt(tid.getPartition());
+ dos.writeInt(taId.getAttempt());
+ }
+
+ private static PartitionState readPartitionState(DataInputStream dis) throws IOException {
+ PartitionState state = PartitionState.values()[dis.readInt()];
+ return state;
+ }
+
+ private static void writePartitionState(DataOutputStream dos, PartitionState state) throws IOException {
+ dos.writeInt(state.ordinal());
+ }
+
+ private static NetworkAddress readNetworkAddress(DataInputStream dis) throws IOException {
+ int bLen = dis.readInt();
+ byte[] ipAddress = new byte[bLen];
+ dis.read(ipAddress);
+ int port = dis.readInt();
+ NetworkAddress networkAddress = new NetworkAddress(ipAddress, port);
+ return networkAddress;
+ }
+
+ private static void writeNetworkAddress(DataOutputStream dos, NetworkAddress networkAddress) throws IOException {
+ byte[] ipAddress = networkAddress.getIpAddress();
+ dos.writeInt(ipAddress.length);
+ dos.write(ipAddress);
+ dos.writeInt(networkAddress.getPort());
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
new file mode 100644
index 0000000..a0dabdd
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+
+public class ClusterControllerRemoteProxy implements IClusterController {
+ private final IIPCHandle ipcHandle;
+
+ public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ @Override
+ public void registerNode(NodeRegistration reg) throws Exception {
+ CCNCFunctions.RegisterNodeFunction fn = new CCNCFunctions.RegisterNodeFunction(reg);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void unregisterNode(String nodeId) throws Exception {
+ CCNCFunctions.UnregisterNodeFunction fn = new CCNCFunctions.UnregisterNodeFunction(
+ nodeId);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+ throws Exception {
+ CCNCFunctions.NotifyTaskCompleteFunction fn = new CCNCFunctions.NotifyTaskCompleteFunction(
+ jobId, taskId, nodeId, statistics);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+ CCNCFunctions.NotifyTaskFailureFunction fn = new CCNCFunctions.NotifyTaskFailureFunction(
+ jobId, taskId, nodeId, details);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+ CCNCFunctions.NotifyJobletCleanupFunction fn = new CCNCFunctions.NotifyJobletCleanupFunction(
+ jobId, nodeId);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
+ CCNCFunctions.NodeHeartbeatFunction fn = new CCNCFunctions.NodeHeartbeatFunction(id,
+ hbData);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
+ CCNCFunctions.ReportProfileFunction fn = new CCNCFunctions.ReportProfileFunction(id,
+ profiles);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
+ CCNCFunctions.RegisterPartitionProviderFunction fn = new CCNCFunctions.RegisterPartitionProviderFunction(
+ partitionDescriptor);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
+ CCNCFunctions.RegisterPartitionRequestFunction fn = new CCNCFunctions.RegisterPartitionRequestFunction(
+ partitionRequest);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
+ CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
+ nodeId, appName, status);
+ ipcHandle.send(-1, fn, null);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
new file mode 100644
index 0000000..8176cb7
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+
+public class NodeControllerRemoteProxy implements INodeController {
+ private final IIPCHandle ipcHandle;
+
+ public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ @Override
+ public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
+ CCNCFunctions.StartTasksFunction stf = new CCNCFunctions.StartTasksFunction(appName, jobId, planBytes,
+ taskDescriptors, connectorPolicies);
+ ipcHandle.send(-1, stf, null);
+ }
+
+ @Override
+ public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
+ CCNCFunctions.AbortTasksFunction atf = new CCNCFunctions.AbortTasksFunction(jobId, tasks);
+ ipcHandle.send(-1, atf, null);
+ }
+
+ @Override
+ public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
+ CCNCFunctions.CleanupJobletFunction cjf = new CCNCFunctions.CleanupJobletFunction(jobId, status);
+ ipcHandle.send(-1, cjf, null);
+ }
+
+ @Override
+ public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+ throws Exception {
+ CCNCFunctions.CreateApplicationFunction caf = new CCNCFunctions.CreateApplicationFunction(appName, deployHar,
+ serializedDistributedState);
+ ipcHandle.send(-1, caf, null);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ CCNCFunctions.DestroyApplicationFunction daf = new CCNCFunctions.DestroyApplicationFunction(appName);
+ ipcHandle.send(-1, daf, null);
+ }
+
+ @Override
+ public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = new CCNCFunctions.ReportPartitionAvailabilityFunction(
+ pid, networkAddress);
+ ipcHandle.send(-1, rpaf, null);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
new file mode 100644
index 0000000..9fc0916
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionDescriptor.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionDescriptor implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+
+ private final String nodeId;
+
+ private final TaskAttemptId producingTaskAttemptId;
+
+ private final boolean reusable;
+
+ private PartitionState state;
+
+ public PartitionDescriptor(PartitionId pid, String nodeId, TaskAttemptId producingTaskAttemptId, boolean reusable) {
+ this.pid = pid;
+ this.nodeId = nodeId;
+ this.producingTaskAttemptId = producingTaskAttemptId;
+ this.reusable = reusable;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public TaskAttemptId getProducingTaskAttemptId() {
+ return producingTaskAttemptId;
+ }
+
+ public PartitionState getState() {
+ return state;
+ }
+
+ public void setState(PartitionState state) {
+ this.state = state;
+ }
+
+ public boolean isReusable() {
+ return reusable;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + pid + ":" + nodeId + ":" + producingTaskAttemptId + (reusable ? "reusable" : "non-reusable") + " "
+ + state + "]";
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java
new file mode 100644
index 0000000..ca34501
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionRequest implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+
+ private final String requestingNodeId;
+
+ private final TaskAttemptId requestingTaskAttemptId;
+
+ private final PartitionState minState;
+
+ public PartitionRequest(PartitionId pid, String requestingNodeId, TaskAttemptId requestingTaskAttemptId,
+ PartitionState minState) {
+ this.pid = pid;
+ this.requestingNodeId = requestingNodeId;
+ this.requestingTaskAttemptId = requestingTaskAttemptId;
+ this.minState = minState;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public String getNodeId() {
+ return requestingNodeId;
+ }
+
+ public TaskAttemptId getRequestingTaskAttemptId() {
+ return requestingTaskAttemptId;
+ }
+
+ public PartitionState getMinimumState() {
+ return minState;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + pid + ":" + requestingNodeId + ":" + requestingTaskAttemptId + ":" + minState + "]";
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
new file mode 100644
index 0000000..edde0b8
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/PartitionState.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+public enum PartitionState {
+ STARTED,
+ COMMITTED;
+
+ public boolean isAtLeast(PartitionState minState) {
+ switch (this) {
+ case COMMITTED:
+ return true;
+
+ case STARTED:
+ return minState == STARTED;
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
new file mode 100644
index 0000000..f6d1f78
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/TaskAttemptDescriptor.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+
+public class TaskAttemptDescriptor implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final TaskAttemptId taId;
+
+ private final int nPartitions;
+
+ private final int[] nInputPartitions;
+
+ private final int[] nOutputPartitions;
+
+ private NetworkAddress[][] inputPartitionLocations;
+
+ public TaskAttemptDescriptor(TaskAttemptId taId, int nPartitions, int[] nInputPartitions, int[] nOutputPartitions) {
+ this.taId = taId;
+ this.nPartitions = nPartitions;
+ this.nInputPartitions = nInputPartitions;
+ this.nOutputPartitions = nOutputPartitions;
+ }
+
+ public TaskAttemptId getTaskAttemptId() {
+ return taId;
+ }
+
+ public int getPartitionCount() {
+ return nPartitions;
+ }
+
+ public int[] getInputPartitionCounts() {
+ return nInputPartitions;
+ }
+
+ public int[] getOutputPartitionCounts() {
+ return nOutputPartitions;
+ }
+
+ public void setInputPartitionLocations(NetworkAddress[][] inputPartitionLocations) {
+ this.inputPartitionLocations = inputPartitionLocations;
+ }
+
+ public NetworkAddress[][] getInputPartitionLocations() {
+ return inputPartitionLocations;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskAttemptDescriptor[taId = " + taId + ", nPartitions = " + nPartitions + ", nInputPartitions = "
+ + Arrays.toString(nInputPartitions) + ", nOutputPartitions = " + Arrays.toString(nOutputPartitions)
+ + "]";
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
new file mode 100644
index 0000000..8529814
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/Counter.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounter;
+
+public class Counter implements ICounter {
+ private final String name;
+ private final AtomicLong counter;
+
+ public Counter(String name) {
+ this.name = name;
+ counter = new AtomicLong();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public long update(long delta) {
+ return counter.addAndGet(delta);
+ }
+
+ @Override
+ public long set(long value) {
+ long oldValue = counter.get();
+ counter.set(value);
+ return oldValue;
+ }
+
+ @Override
+ public long get() {
+ return counter.get();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
new file mode 100644
index 0000000..f8fba7a
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.io.Serializable;
+
+public class MultiResolutionEventProfiler implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final int[] times;
+
+ private long offset;
+
+ private int ptr;
+
+ private int resolution;
+
+ private int eventCounter;
+
+ public MultiResolutionEventProfiler(int nSamples) {
+ times = new int[nSamples];
+ offset = -1;
+ ptr = 0;
+ resolution = 1;
+ eventCounter = 0;
+ }
+
+ public void reportEvent() {
+ ++eventCounter;
+ if (eventCounter % resolution != 0) {
+ return;
+ }
+ if (ptr >= times.length) {
+ compact();
+ return;
+ }
+ eventCounter = 0;
+ long time = System.currentTimeMillis();
+ if (offset < 0) {
+ offset = time;
+ }
+ int value = (int) (time - offset);
+ times[ptr++] = value;
+ }
+
+ private void compact() {
+ for (int i = 1; i < ptr / 2; ++i) {
+ times[i] = times[i * 2];
+ }
+ resolution <<= 1;
+ ptr >>= 1;
+ }
+
+ public int getResolution() {
+ return resolution;
+ }
+
+ public int getCount() {
+ return ptr;
+ }
+
+ public int[] getSamples() {
+ return times;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
new file mode 100644
index 0000000..b4e3619
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/AbstractProfile.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public abstract class AbstractProfile implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ protected final Map<String, Long> counters;
+
+ public AbstractProfile() {
+ counters = new HashMap<String, Long>();
+ }
+
+ public Map<String, Long> getCounters() {
+ return counters;
+ }
+
+ public abstract JSONObject toJSON() throws JSONException;
+
+ protected void populateCounters(JSONObject jo) throws JSONException {
+ JSONArray countersObj = new JSONArray();
+ for (Map.Entry<String, Long> e : counters.entrySet()) {
+ JSONObject jpe = new JSONObject();
+ jpe.put("name", e.getKey());
+ jpe.put("value", e.getValue());
+ countersObj.put(jpe);
+ }
+ jo.put("counters", countersObj);
+ }
+
+ protected void merge(AbstractProfile profile) {
+ counters.putAll(profile.counters);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
new file mode 100644
index 0000000..46a964a
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobProfile.java
@@ -0,0 +1,57 @@
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class JobProfile extends AbstractProfile {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final Map<String, JobletProfile> jobletProfiles;
+
+ public JobProfile(JobId jobId) {
+ this.jobId = jobId;
+ jobletProfiles = new HashMap<String, JobletProfile>();
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public Map<String, JobletProfile> getJobletProfiles() {
+ return jobletProfiles;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject json = new JSONObject();
+
+ json.put("job-id", jobId.toString());
+ populateCounters(json);
+ JSONArray jobletsArray = new JSONArray();
+ for (JobletProfile p : jobletProfiles.values()) {
+ jobletsArray.put(p.toJSON());
+ }
+ json.put("joblets", jobletsArray);
+
+ return json;
+ }
+
+ public void merge(JobProfile other) {
+ super.merge(this);
+ for (JobletProfile jp : other.jobletProfiles.values()) {
+ if (jobletProfiles.containsKey(jp.getNodeId())) {
+ jobletProfiles.get(jp.getNodeId()).merge(jp);
+ } else {
+ jobletProfiles.put(jp.getNodeId(), jp);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
new file mode 100644
index 0000000..0c60006
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/JobletProfile.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+
+public class JobletProfile extends AbstractProfile {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ private final Map<TaskAttemptId, TaskProfile> taskProfiles;
+
+ public JobletProfile(String nodeId) {
+ this.nodeId = nodeId;
+ taskProfiles = new HashMap<TaskAttemptId, TaskProfile>();
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public Map<TaskAttemptId, TaskProfile> getTaskProfiles() {
+ return taskProfiles;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject json = new JSONObject();
+
+ json.put("node-id", nodeId.toString());
+ populateCounters(json);
+ JSONArray tasks = new JSONArray();
+ for (TaskProfile p : taskProfiles.values()) {
+ tasks.put(p.toJSON());
+ }
+ json.put("tasks", tasks);
+
+ return json;
+ }
+
+ public void merge(JobletProfile jp) {
+ super.merge(this);
+ for (TaskProfile tp : jp.taskProfiles.values()) {
+ if (taskProfiles.containsKey(tp.getTaskId())) {
+ taskProfiles.get(tp.getTaskId()).merge(tp);
+ } else {
+ taskProfiles.put(tp.getTaskId(), tp);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
new file mode 100644
index 0000000..ef61796
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/PartitionProfile.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+
+public class PartitionProfile implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+
+ private final long openTime;
+
+ private final long closeTime;
+
+ private final MultiResolutionEventProfiler mrep;
+
+ public PartitionProfile(PartitionId pid, long openTime, long closeTime, MultiResolutionEventProfiler mrep) {
+ this.pid = pid;
+ this.openTime = openTime;
+ this.closeTime = closeTime;
+ this.mrep = mrep;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public long getOpenTime() {
+ return openTime;
+ }
+
+ public long getCloseTime() {
+ return closeTime;
+ }
+
+ public MultiResolutionEventProfiler getSamples() {
+ return mrep;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
new file mode 100644
index 0000000..2116f61
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.om;
+
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+
+public class TaskProfile extends AbstractProfile {
+ private static final long serialVersionUID = 1L;
+
+ private final TaskAttemptId taskAttemptId;
+
+ private final Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+ public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile) {
+ this.taskAttemptId = taskAttemptId;
+ this.partitionSendProfile = partitionSendProfile;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskAttemptId;
+ }
+
+ public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
+ return partitionSendProfile;
+ }
+
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject json = new JSONObject();
+
+ json.put("activity-id", taskAttemptId.getTaskId().getActivityId().toString());
+ json.put("partition", taskAttemptId.getTaskId().getPartition());
+ json.put("attempt", taskAttemptId.getAttempt());
+ if (partitionSendProfile != null) {
+ JSONArray pspArray = new JSONArray();
+ for (PartitionProfile pp : partitionSendProfile.values()) {
+ JSONObject ppObj = new JSONObject();
+ PartitionId pid = pp.getPartitionId();
+ JSONObject pidObj = new JSONObject();
+ pidObj.put("job-id", pid.getJobId());
+ pidObj.put("connector-id", pid.getConnectorDescriptorId());
+ pidObj.put("sender-index", pid.getSenderIndex());
+ pidObj.put("receiver-index", pid.getReceiverIndex());
+ ppObj.put("partition-id", pidObj);
+ ppObj.put("open-time", pp.getOpenTime());
+ ppObj.put("close-time", pp.getCloseTime());
+ MultiResolutionEventProfiler samples = pp.getSamples();
+ ppObj.put("offset", samples.getOffset());
+ int resolution = samples.getResolution();
+ int sampleCount = samples.getCount();
+ JSONArray ftA = new JSONArray();
+ int[] ft = samples.getSamples();
+ for (int i = 0; i < sampleCount; ++i) {
+ ftA.put(ft[i]);
+ }
+ ppObj.put("frame-times", ftA);
+ ppObj.put("resolution", resolution);
+ pspArray.put(ppObj);
+ }
+ json.put("partition-send-profile", pspArray);
+ }
+ populateCounters(json);
+
+ return json;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/logs/LogFile.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/logs/LogFile.java
new file mode 100644
index 0000000..1521aa0
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/logs/LogFile.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.logs;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+
+import org.json.JSONObject;
+
+public class LogFile {
+ private final File root;
+
+ private PrintWriter out;
+
+ public LogFile(File root) {
+ this.root = root;
+ }
+
+ public void open() throws Exception {
+ root.mkdirs();
+ out = new PrintWriter(new FileOutputStream(new File(root, String.valueOf(System.currentTimeMillis()) + ".log"),
+ true));
+ }
+
+ public void log(JSONObject object) throws Exception {
+ out.println(object.toString(1));
+ out.flush();
+ }
+
+ public void close() {
+ out.flush();
+ out.close();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/AbstractService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/AbstractService.java
new file mode 100644
index 0000000..4f9cb92
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/AbstractService.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.service;
+
+public abstract class AbstractService implements IService {
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/IService.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/IService.java
new file mode 100644
index 0000000..4e81066
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/service/IService.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.service;
+
+public interface IService {
+ public void start() throws Exception;
+
+ public void stop() throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
new file mode 100644
index 0000000..4ead100
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/AbstractWork.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+import java.util.logging.Level;
+
+public abstract class AbstractWork implements Runnable {
+ public Level logLevel() {
+ return Level.INFO;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
new file mode 100644
index 0000000..7eb4ff6
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/FutureValue.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+public class FutureValue<T> implements IResultCallback<T> {
+ private boolean done;
+
+ private T value;
+
+ private Exception e;
+
+ public FutureValue() {
+ done = false;
+ value = null;
+ e = null;
+ }
+
+ @Override
+ public synchronized void setValue(T value) {
+ done = true;
+ this.value = value;
+ e = null;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void setException(Exception e) {
+ done = true;
+ this.e = e;
+ value = null;
+ notifyAll();
+ }
+
+ public synchronized void reset() {
+ done = false;
+ value = null;
+ e = null;
+ notifyAll();
+ }
+
+ public synchronized T get() throws Exception {
+ while (!done) {
+ wait();
+ }
+ if (e != null) {
+ throw e;
+ }
+ return value;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java
new file mode 100644
index 0000000..dcea864
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IPCResponder.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.common.work;
+
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.exceptions.IPCException;
+
+public class IPCResponder<T> implements IResultCallback<T> {
+ private final IIPCHandle handle;
+
+ private final long rmid;
+
+ public IPCResponder(IIPCHandle handle, long rmid) {
+ this.handle = handle;
+ this.rmid = rmid;
+ }
+
+ @Override
+ public void setValue(T result) {
+ try {
+ handle.send(rmid, result, null);
+ } catch (IPCException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void setException(Exception e) {
+ try {
+ handle.send(rmid, null, e);
+ } catch (IPCException e1) {
+ e1.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
new file mode 100644
index 0000000..80c3d76
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/IResultCallback.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+public interface IResultCallback<T> {
+ public void setValue(T result);
+
+ public void setException(Exception e);
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
new file mode 100644
index 0000000..e94086f
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/SynchronizableWork.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+public abstract class SynchronizableWork extends AbstractWork {
+ private boolean done;
+
+ private Exception e;
+
+ protected abstract void doRun() throws Exception;
+
+ public void init() {
+ done = false;
+ e = null;
+ }
+
+ @Override
+ public final void run() {
+ try {
+ doRun();
+ } catch (Exception e) {
+ this.e = e;
+ } finally {
+ synchronized (this) {
+ done = true;
+ notifyAll();
+ }
+ }
+ }
+
+ public final synchronized void sync() throws Exception {
+ while (!done) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw e;
+ }
+ }
+ if (e != null) {
+ throw e;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
new file mode 100644
index 0000000..36f0c49
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/work/WorkQueue.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.work;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public class WorkQueue {
+ private static final Logger LOGGER = Logger.getLogger(WorkQueue.class.getName());
+
+ private final LinkedBlockingQueue<AbstractWork> queue;
+ private final WorkerThread thread;
+ private final Semaphore stopSemaphore;
+ private boolean stopped;
+ private final AtomicInteger enqueueCount;
+ private final AtomicInteger dequeueCount;
+
+ public WorkQueue() {
+ queue = new LinkedBlockingQueue<AbstractWork>();
+ thread = new WorkerThread();
+ stopSemaphore = new Semaphore(1);
+ enqueueCount = new AtomicInteger();
+ dequeueCount = new AtomicInteger();
+ }
+
+ public void start() throws HyracksException {
+ stopped = false;
+ enqueueCount.set(0);
+ dequeueCount.set(0);
+ try {
+ stopSemaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new HyracksException(e);
+ }
+ thread.start();
+ }
+
+ public void stop() throws HyracksException {
+ synchronized (this) {
+ stopped = true;
+ }
+ schedule(new AbstractWork() {
+ @Override
+ public void run() {
+ }
+ });
+ try {
+ stopSemaphore.acquire();
+ } catch (InterruptedException e) {
+ throw new HyracksException(e);
+ }
+ }
+
+ public void schedule(AbstractWork event) {
+ enqueueCount.incrementAndGet();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Enqueue: " + enqueueCount);
+ }
+ if (LOGGER.isLoggable(event.logLevel())) {
+ LOGGER.log(event.logLevel(), "Scheduling: " + event);
+ }
+ queue.offer(event);
+ }
+
+ public void scheduleAndSync(SynchronizableWork sRunnable) throws Exception {
+ schedule(sRunnable);
+ sRunnable.sync();
+ }
+
+ private class WorkerThread extends Thread {
+ WorkerThread() {
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ AbstractWork r;
+ while (true) {
+ synchronized (WorkQueue.this) {
+ if (stopped) {
+ return;
+ }
+ }
+ try {
+ r = queue.take();
+ } catch (InterruptedException e) {
+ continue;
+ }
+ dequeueCount.incrementAndGet();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Dequeue: " + dequeueCount + "/" + enqueueCount);
+ }
+ try {
+ if (LOGGER.isLoggable(r.logLevel())) {
+ LOGGER.log(r.logLevel(), "Executing: " + r);
+ }
+ r.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ } finally {
+ stopSemaphore.release();
+ }
+ }
+ }
+}
\ No newline at end of file