add missing new files in merge hyracks_dev_next r847:977
git-svn-id: https://hyracks.googlecode.com/svn/branches/aggregators_dev_next@979 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
new file mode 100644
index 0000000..a7f324d
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class HyracksClientInterfaceFunctions {
+ public enum FunctionId {
+ GET_CLUSTER_CONTROLLER_INFO,
+ CREATE_APPLICATION,
+ START_APPLICATION,
+ DESTROY_APPLICATION,
+ CREATE_JOB,
+ GET_JOB_STATUS,
+ START_JOB,
+ WAIT_FOR_COMPLETION,
+ GET_NODE_CONTROLLERS_INFO
+ }
+
+ public abstract static class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class GetClusterControllerInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_CLUSTER_CONTROLLER_INFO;
+ }
+ }
+
+ public static class CreateApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public CreateApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class StartApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public StartApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class DestroyApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public DestroyApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class CreateJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final byte[] jobSpec;
+ private final EnumSet<JobFlag> jobFlags;
+
+ public CreateJobFunction(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) {
+ this.appName = appName;
+ this.jobSpec = jobSpec;
+ this.jobFlags = jobFlags;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_JOB;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public byte[] getJobSpec() {
+ return jobSpec;
+ }
+
+ public EnumSet<JobFlag> getJobFlags() {
+ return jobFlags;
+ }
+ }
+
+ public static class GetJobStatusFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public GetJobStatusFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_JOB_STATUS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class StartJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public StartJobFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_JOB;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class WaitForCompletionFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public WaitForCompletionFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.WAIT_FOR_COMPLETION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class GetNodeControllersInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_NODE_CONTROLLERS_INFO;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
new file mode 100644
index 0000000..f74d06e
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
+ private final IIPCHandle ipcHandle;
+
+ public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ @Override
+ public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+ return (ClusterControllerInfo) sync.call(ipcHandle, gccif);
+ }
+
+ @Override
+ public void createApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
+ appName);
+ sync.call(ipcHandle, caf);
+ }
+
+ @Override
+ public void startApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
+ appName);
+ sync.call(ipcHandle, saf);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
+ appName);
+ sync.call(ipcHandle, daf);
+ }
+
+ @Override
+ public JobId createJob(String appName, byte[] jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.CreateJobFunction cjf = new HyracksClientInterfaceFunctions.CreateJobFunction(
+ appName, jobSpec, jobFlags);
+ return (JobId) sync.call(ipcHandle, cjf);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
+ jobId);
+ return (JobStatus) sync.call(ipcHandle, gjsf);
+ }
+
+ @Override
+ public void startJob(JobId jobId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+ jobId);
+ sync.call(ipcHandle, sjf);
+ }
+
+ @Override
+ public void waitForCompletion(JobId jobId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
+ jobId);
+ sync.call(ipcHandle, wfcf);
+ }
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+ SyncRMI sync = new SyncRMI();
+ HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+ return (Map<String, NodeControllerInfo>) sync.call(ipcHandle, gncif);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
new file mode 100644
index 0000000..1b38cb9
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.FileEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+
+/**
+ * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
+ * Controller.
+ *
+ * @author vinayakb
+ *
+ */
+public final class HyracksConnection implements IHyracksClientConnection {
+ private final String ccHost;
+
+ private final IPCSystem ipc;
+
+ private final IHyracksClientInterface hci;
+
+ private final ClusterControllerInfo ccInfo;
+
+ /**
+ * Constructor to create a connection to the Hyracks Cluster Controller.
+ *
+ * @param ccHost
+ * Host name (or IP Address) where the Cluster Controller can be
+ * reached.
+ * @param ccPort
+ * Port to reach the Hyracks Cluster Controller at the specified
+ * host name.
+ * @throws Exception
+ */
+ public HyracksConnection(String ccHost, int ccPort) throws Exception {
+ this.ccHost = ccHost;
+ ipc = new IPCSystem(new InetSocketAddress(0));
+ ipc.start();
+ IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
+ this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle);
+ ccInfo = hci.getClusterControllerInfo();
+ }
+
+ @Override
+ public void createApplication(String appName, File harFile) throws Exception {
+ hci.createApplication(appName);
+ if (harFile != null) {
+ HttpClient hc = new DefaultHttpClient();
+ HttpPut put = new HttpPut("http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/" + appName);
+ put.setEntity(new FileEntity(harFile, "application/octet-stream"));
+ HttpResponse response = hc.execute(put);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ hci.destroyApplication(appName);
+ throw new HyracksException(response.getStatusLine().toString());
+ }
+ }
+ hci.startApplication(appName);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ hci.destroyApplication(appName);
+ }
+
+ @Override
+ public JobId createJob(String appName, JobSpecification jobSpec) throws Exception {
+ return createJob(appName, jobSpec, EnumSet.noneOf(JobFlag.class));
+ }
+
+ @Override
+ public JobId createJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ return hci.createJob(appName, JavaSerializationUtils.serialize(jobSpec), jobFlags);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
+ return hci.getJobStatus(jobId);
+ }
+
+ @Override
+ public void start(JobId jobId) throws Exception {
+ hci.startJob(jobId);
+ }
+
+ @Override
+ public void waitForCompletion(JobId jobId) throws Exception {
+ hci.waitForCompletion(jobId);
+ }
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+ return hci.getNodeControllersInfo();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java
new file mode 100644
index 0000000..4a6f826
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITypeTraits extends Serializable {
+ public boolean isFixedLength();
+
+ public int getFixedLength();
+}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
new file mode 100644
index 0000000..b3aa406
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ipc/HyracksClientInterfaceDelegateIPCI.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.ipc;
+
+import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
+import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class HyracksClientInterfaceDelegateIPCI implements IIPCI {
+ private final IHyracksClientInterface hci;
+
+ public HyracksClientInterfaceDelegateIPCI(IHyracksClientInterface hci) {
+ this.hci = hci;
+ }
+
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) req;
+ switch (fn.getFunctionId()) {
+ case GET_CLUSTER_CONTROLLER_INFO: {
+ return hci.getClusterControllerInfo();
+ }
+
+ case CREATE_APPLICATION: {
+ HyracksClientInterfaceFunctions.CreateApplicationFunction caf = (HyracksClientInterfaceFunctions.CreateApplicationFunction) fn;
+ hci.createApplication(caf.getAppName());
+ return null;
+ }
+
+ case START_APPLICATION: {
+ HyracksClientInterfaceFunctions.StartApplicationFunction saf = (HyracksClientInterfaceFunctions.StartApplicationFunction) fn;
+ hci.startApplication(saf.getAppName());
+ return null;
+ }
+
+ case DESTROY_APPLICATION: {
+ HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = (HyracksClientInterfaceFunctions.DestroyApplicationFunction) fn;
+ hci.destroyApplication(daf.getAppName());
+ return null;
+ }
+
+ case CREATE_JOB: {
+ HyracksClientInterfaceFunctions.CreateJobFunction cjf = (HyracksClientInterfaceFunctions.CreateJobFunction) fn;
+ return hci.createJob(cjf.getAppName(), cjf.getJobSpec(), cjf.getJobFlags());
+ }
+
+ case GET_JOB_STATUS: {
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+ return hci.getJobStatus(gjsf.getJobId());
+ }
+
+ case START_JOB: {
+ HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+ hci.startJob(sjf.getJobId());
+ return null;
+ }
+
+ case WAIT_FOR_COMPLETION: {
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+ hci.waitForCompletion(wfcf.getJobId());
+ return null;
+ }
+
+ case GET_NODE_CONTROLLERS_INFO: {
+ return hci.getNodeControllersInfo();
+ }
+ }
+ throw new IllegalArgumentException("Unknown function " + fn.getFunctionId());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
new file mode 100644
index 0000000..3c8f7d0
--- /dev/null
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ApplicationCreateWork.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.io.IOException;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+import edu.uci.ics.hyracks.control.common.work.FutureValue;
+
+public class ApplicationCreateWork extends AbstractWork {
+ private final ClusterControllerService ccs;
+ private final String appName;
+ private FutureValue<Object> fv;
+
+ public ApplicationCreateWork(ClusterControllerService ccs, String appName, FutureValue<Object> fv) {
+ this.ccs = ccs;
+ this.appName = appName;
+ this.fv = fv;
+ }
+
+ @Override
+ public void run() {
+ Map<String, CCApplicationContext> applications = ccs.getApplicationMap();
+ if (applications.containsKey(appName)) {
+ fv.setException(new HyracksException("Duplicate application with name: " + appName + " being created."));
+ }
+ CCApplicationContext appCtx;
+ try {
+ appCtx = new CCApplicationContext(ccs.getServerContext(), ccs.getCCContext(), appName);
+ } catch (IOException e) {
+ fv.setException(e);
+ return;
+ }
+ applications.put(appName, appCtx);
+ fv.setValue(null);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
new file mode 100644
index 0000000..e35f962
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerDelegateIPCI.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerFunctions.Function;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class ClusterControllerDelegateIPCI implements IIPCI {
+ private final IClusterController cc;
+
+ public ClusterControllerDelegateIPCI(IClusterController cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ ClusterControllerFunctions.Function fn = (Function) req;
+ switch (fn.getFunctionId()) {
+ case REGISTER_NODE: {
+ ClusterControllerFunctions.RegisterNodeFunction rnf = (ClusterControllerFunctions.RegisterNodeFunction) fn;
+ return cc.registerNode(rnf.getNodeRegistration());
+ }
+
+ case UNREGISTER_NODE: {
+ ClusterControllerFunctions.UnregisterNodeFunction unf = (ClusterControllerFunctions.UnregisterNodeFunction) fn;
+ cc.unregisterNode(unf.getNodeId());
+ return null;
+ }
+
+ case NODE_HEARTBEAT: {
+ ClusterControllerFunctions.NodeHeartbeatFunction nhf = (ClusterControllerFunctions.NodeHeartbeatFunction) fn;
+ cc.nodeHeartbeat(nhf.getNodeId(), nhf.getHeartbeatData());
+ return null;
+ }
+
+ case NOTIFY_JOBLET_CLEANUP: {
+ ClusterControllerFunctions.NotifyJobletCleanupFunction njcf = (ClusterControllerFunctions.NotifyJobletCleanupFunction) fn;
+ cc.notifyJobletCleanup(njcf.getJobId(), njcf.getNodeId());
+ return null;
+ }
+
+ case REPORT_PROFILE: {
+ ClusterControllerFunctions.ReportProfileFunction rpf = (ClusterControllerFunctions.ReportProfileFunction) fn;
+ cc.reportProfile(rpf.getNodeId(), rpf.getProfiles());
+ return null;
+ }
+
+ case NOTIFY_TASK_COMPLETE: {
+ ClusterControllerFunctions.NotifyTaskCompleteFunction ntcf = (ClusterControllerFunctions.NotifyTaskCompleteFunction) fn;
+ cc.notifyTaskComplete(ntcf.getJobId(), ntcf.getTaskId(), ntcf.getNodeId(), ntcf.getStatistics());
+ return null;
+ }
+ case NOTIFY_TASK_FAILURE: {
+ ClusterControllerFunctions.NotifyTaskFailureFunction ntff = (ClusterControllerFunctions.NotifyTaskFailureFunction) fn;
+ cc.notifyTaskFailure(ntff.getJobId(), ntff.getTaskId(), ntff.getDetails(), ntff.getDetails());
+ return null;
+ }
+
+ case REGISTER_PARTITION_PROVIDER: {
+ ClusterControllerFunctions.RegisterPartitionProviderFunction rppf = (ClusterControllerFunctions.RegisterPartitionProviderFunction) fn;
+ cc.registerPartitionProvider(rppf.getPartitionDescriptor());
+ return null;
+ }
+
+ case REGISTER_PARTITION_REQUEST: {
+ ClusterControllerFunctions.RegisterPartitionRequestFunction rprf = (ClusterControllerFunctions.RegisterPartitionRequestFunction) fn;
+ cc.registerPartitionRequest(rprf.getPartitionRequest());
+ return null;
+ }
+ }
+ throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
new file mode 100644
index 0000000..4c76357
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerFunctions.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.Serializable;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+
+public class ClusterControllerFunctions {
+ public enum FunctionId {
+ REGISTER_NODE,
+ UNREGISTER_NODE,
+ NOTIFY_TASK_COMPLETE,
+ NOTIFY_TASK_FAILURE,
+ NOTIFY_JOBLET_CLEANUP,
+ NODE_HEARTBEAT,
+ REPORT_PROFILE,
+ REGISTER_PARTITION_PROVIDER,
+ REGISTER_PARTITION_REQUEST,
+ }
+
+ public static abstract class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class RegisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final NodeRegistration reg;
+
+ public RegisterNodeFunction(NodeRegistration reg) {
+ this.reg = reg;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_NODE;
+ }
+
+ public NodeRegistration getNodeRegistration() {
+ return reg;
+ }
+ }
+
+ public static class UnregisterNodeFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ public UnregisterNodeFunction(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.UNREGISTER_NODE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NotifyTaskCompleteFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final TaskProfile statistics;
+
+ public NotifyTaskCompleteFunction(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.statistics = statistics;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_COMPLETE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public TaskProfile getStatistics() {
+ return statistics;
+ }
+ }
+
+ public static class NotifyTaskFailureFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final TaskAttemptId taskId;
+ private final String nodeId;
+ private final String details;
+
+ public NotifyTaskFailureFunction(JobId jobId, TaskAttemptId taskId, String nodeId, String details) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.nodeId = nodeId;
+ this.details = details;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_TASK_FAILURE;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptId getTaskId() {
+ return taskId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public String getDetails() {
+ return details;
+ }
+ }
+
+ public static class NotifyJobletCleanupFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final String nodeId;
+
+ public NotifyJobletCleanupFunction(JobId jobId, String nodeId) {
+ this.jobId = jobId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NOTIFY_JOBLET_CLEANUP;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ public static class NodeHeartbeatFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final HeartbeatData hbData;
+
+ public NodeHeartbeatFunction(String nodeId, HeartbeatData hbData) {
+ this.nodeId = nodeId;
+ this.hbData = hbData;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.NODE_HEARTBEAT;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public HeartbeatData getHeartbeatData() {
+ return hbData;
+ }
+ }
+
+ public static class ReportProfileFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+ private final List<JobProfile> profiles;
+
+ public ReportProfileFunction(String nodeId, List<JobProfile> profiles) {
+ this.nodeId = nodeId;
+ this.profiles = profiles;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PROFILE;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public List<JobProfile> getProfiles() {
+ return profiles;
+ }
+ }
+
+ public static class RegisterPartitionProviderFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionDescriptor partitionDescriptor;
+
+ public RegisterPartitionProviderFunction(PartitionDescriptor partitionDescriptor) {
+ this.partitionDescriptor = partitionDescriptor;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_PROVIDER;
+ }
+
+ public PartitionDescriptor getPartitionDescriptor() {
+ return partitionDescriptor;
+ }
+ }
+
+ public static class RegisterPartitionRequestFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionRequest partitionRequest;
+
+ public RegisterPartitionRequestFunction(PartitionRequest partitionRequest) {
+ this.partitionRequest = partitionRequest;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REGISTER_PARTITION_REQUEST;
+ }
+
+ public PartitionRequest getPartitionRequest() {
+ return partitionRequest;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
new file mode 100644
index 0000000..0aeab72
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.control.common.base.IClusterController;
+import edu.uci.ics.hyracks.control.common.controllers.NodeParameters;
+import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
+import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
+import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
+import edu.uci.ics.hyracks.control.common.job.PartitionRequest;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
+import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class ClusterControllerRemoteProxy implements IClusterController {
+ private final IIPCHandle ipcHandle;
+
+ public ClusterControllerRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ @Override
+ public NodeParameters registerNode(NodeRegistration reg) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.RegisterNodeFunction fn = new ClusterControllerFunctions.RegisterNodeFunction(reg);
+ NodeParameters result = (NodeParameters) sync.call(ipcHandle, fn);
+ return result;
+ }
+
+ @Override
+ public void unregisterNode(String nodeId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.UnregisterNodeFunction fn = new ClusterControllerFunctions.UnregisterNodeFunction(
+ nodeId);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void notifyTaskComplete(JobId jobId, TaskAttemptId taskId, String nodeId, TaskProfile statistics)
+ throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.NotifyTaskCompleteFunction fn = new ClusterControllerFunctions.NotifyTaskCompleteFunction(
+ jobId, taskId, nodeId, statistics);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void notifyTaskFailure(JobId jobId, TaskAttemptId taskId, String nodeId, String details) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.NotifyTaskFailureFunction fn = new ClusterControllerFunctions.NotifyTaskFailureFunction(
+ jobId, taskId, nodeId, details);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void notifyJobletCleanup(JobId jobId, String nodeId) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.NotifyJobletCleanupFunction fn = new ClusterControllerFunctions.NotifyJobletCleanupFunction(
+ jobId, nodeId);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.NodeHeartbeatFunction fn = new ClusterControllerFunctions.NodeHeartbeatFunction(id,
+ hbData);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void reportProfile(String id, List<JobProfile> profiles) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.ReportProfileFunction fn = new ClusterControllerFunctions.ReportProfileFunction(id,
+ profiles);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void registerPartitionProvider(PartitionDescriptor partitionDescriptor) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.RegisterPartitionProviderFunction fn = new ClusterControllerFunctions.RegisterPartitionProviderFunction(
+ partitionDescriptor);
+ sync.call(ipcHandle, fn);
+ }
+
+ @Override
+ public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ ClusterControllerFunctions.RegisterPartitionRequestFunction fn = new ClusterControllerFunctions.RegisterPartitionRequestFunction(
+ partitionRequest);
+ sync.call(ipcHandle, fn);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
new file mode 100644
index 0000000..f3e51ec
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerDelegateIPCI.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.IIPCI;
+
+public class NodeControllerDelegateIPCI implements IIPCI {
+ private final INodeController nc;
+
+ public NodeControllerDelegateIPCI(INodeController nc) {
+ this.nc = nc;
+ }
+
+ @Override
+ public Object call(IIPCHandle caller, Object req) throws Exception {
+ NodeControllerFunctions.Function fn = (NodeControllerFunctions.Function) req;
+ switch (fn.getFunctionId()) {
+ case START_TASKS: {
+ NodeControllerFunctions.StartTasksFunction stf = (NodeControllerFunctions.StartTasksFunction) fn;
+ nc.startTasks(stf.getAppName(), stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(),
+ stf.getConnectorPolicies());
+ return null;
+ }
+
+ case ABORT_TASKS: {
+ NodeControllerFunctions.AbortTasksFunction atf = (NodeControllerFunctions.AbortTasksFunction) fn;
+ nc.abortTasks(atf.getJobId(), atf.getTasks());
+ return null;
+ }
+
+ case CLEANUP_JOBLET: {
+ NodeControllerFunctions.CleanupJobletFunction cjf = (NodeControllerFunctions.CleanupJobletFunction) fn;
+ nc.cleanUpJoblet(cjf.getJobId(), cjf.getStatus());
+ return null;
+ }
+
+ case CREATE_APPLICATION: {
+ NodeControllerFunctions.CreateApplicationFunction caf = (NodeControllerFunctions.CreateApplicationFunction) fn;
+ nc.createApplication(caf.getAppName(), caf.isDeployHar(), caf.getSerializedDistributedState());
+ return null;
+ }
+
+ case DESTROY_APPLICATION: {
+ NodeControllerFunctions.DestroyApplicationFunction daf = (NodeControllerFunctions.DestroyApplicationFunction) fn;
+ nc.destroyApplication(daf.getAppName());
+ return null;
+ }
+
+ case REPORT_PARTITION_AVAILABILITY: {
+ NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = (NodeControllerFunctions.ReportPartitionAvailabilityFunction) fn;
+ nc.reportPartitionAvailability(rpaf.getPartitionId(), rpaf.getNetworkAddress());
+ return null;
+ }
+ }
+ throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
new file mode 100644
index 0000000..0d39c1b
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerFunctions.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+
+public class NodeControllerFunctions {
+ public enum FunctionId {
+ START_TASKS,
+ ABORT_TASKS,
+ CLEANUP_JOBLET,
+ CREATE_APPLICATION,
+ DESTROY_APPLICATION,
+ REPORT_PARTITION_AVAILABILITY
+ }
+
+ public static abstract class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class StartTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final JobId jobId;
+ private final byte[] planBytes;
+ private final List<TaskAttemptDescriptor> taskDescriptors;
+ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
+ public StartTasksFunction(String appName, JobId jobId, byte[] planBytes,
+ List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+ this.appName = appName;
+ this.jobId = jobId;
+ this.planBytes = planBytes;
+ this.taskDescriptors = taskDescriptors;
+ this.connectorPolicies = connectorPolicies;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_TASKS;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public byte[] getPlanBytes() {
+ return planBytes;
+ }
+
+ public List<TaskAttemptDescriptor> getTaskDescriptors() {
+ return taskDescriptors;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicies() {
+ return connectorPolicies;
+ }
+ }
+
+ public static class AbortTasksFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final List<TaskAttemptId> tasks;
+
+ public AbortTasksFunction(JobId jobId, List<TaskAttemptId> tasks) {
+ this.jobId = jobId;
+ this.tasks = tasks;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.ABORT_TASKS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public List<TaskAttemptId> getTasks() {
+ return tasks;
+ }
+ }
+
+ public static class CleanupJobletFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+ private final JobStatus status;
+
+ public CleanupJobletFunction(JobId jobId, JobStatus status) {
+ this.jobId = jobId;
+ this.status = status;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CLEANUP_JOBLET;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+ }
+
+ public static class CreateApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final boolean deployHar;
+ private final byte[] serializedDistributedState;
+
+ public CreateApplicationFunction(String appName, boolean deployHar, byte[] serializedDistributedState) {
+ this.appName = appName;
+ this.deployHar = deployHar;
+ this.serializedDistributedState = serializedDistributedState;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public boolean isDeployHar() {
+ return deployHar;
+ }
+
+ public byte[] getSerializedDistributedState() {
+ return serializedDistributedState;
+ }
+ }
+
+ public static class DestroyApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public DestroyApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class ReportPartitionAvailabilityFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final PartitionId pid;
+ private final NetworkAddress networkAddress;
+
+ public ReportPartitionAvailabilityFunction(PartitionId pid, NetworkAddress networkAddress) {
+ this.pid = pid;
+ this.networkAddress = networkAddress;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.REPORT_PARTITION_AVAILABILITY;
+ }
+
+ public PartitionId getPartitionId() {
+ return pid;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return networkAddress;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
new file mode 100644
index 0000000..ccd9468
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.ipc;
+
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.SyncRMI;
+
+public class NodeControllerRemoteProxy implements INodeController {
+ private final IIPCHandle ipcHandle;
+
+ public NodeControllerRemoteProxy(IIPCHandle ipcHandle) {
+ this.ipcHandle = ipcHandle;
+ }
+
+ @Override
+ public void startTasks(String appName, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+ Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.StartTasksFunction stf = new NodeControllerFunctions.StartTasksFunction(appName, jobId,
+ planBytes, taskDescriptors, connectorPolicies);
+ sync.call(ipcHandle, stf);
+ }
+
+ @Override
+ public void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.AbortTasksFunction atf = new NodeControllerFunctions.AbortTasksFunction(jobId, tasks);
+ sync.call(ipcHandle, atf);
+ }
+
+ @Override
+ public void cleanUpJoblet(JobId jobId, JobStatus status) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.CleanupJobletFunction cjf = new NodeControllerFunctions.CleanupJobletFunction(jobId,
+ status);
+ sync.call(ipcHandle, cjf);
+ }
+
+ @Override
+ public void createApplication(String appName, boolean deployHar, byte[] serializedDistributedState)
+ throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.CreateApplicationFunction caf = new NodeControllerFunctions.CreateApplicationFunction(
+ appName, deployHar, serializedDistributedState);
+ sync.call(ipcHandle, caf);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ SyncRMI sync = new SyncRMI();
+ NodeControllerFunctions.DestroyApplicationFunction daf = new NodeControllerFunctions.DestroyApplicationFunction(
+ appName);
+ sync.call(ipcHandle, daf);
+ }
+
+ @Override
+ public void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception {
+ NodeControllerFunctions.ReportPartitionAvailabilityFunction rpaf = new NodeControllerFunctions.ReportPartitionAvailabilityFunction(
+ pid, networkAddress);
+ ipcHandle.send(rpaf, null);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
new file mode 100644
index 0000000..f8fba7a
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/profiling/counters/MultiResolutionEventProfiler.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.profiling.counters;
+
+import java.io.Serializable;
+
+public class MultiResolutionEventProfiler implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final int[] times;
+
+ private long offset;
+
+ private int ptr;
+
+ private int resolution;
+
+ private int eventCounter;
+
+ public MultiResolutionEventProfiler(int nSamples) {
+ times = new int[nSamples];
+ offset = -1;
+ ptr = 0;
+ resolution = 1;
+ eventCounter = 0;
+ }
+
+ public void reportEvent() {
+ ++eventCounter;
+ if (eventCounter % resolution != 0) {
+ return;
+ }
+ if (ptr >= times.length) {
+ compact();
+ return;
+ }
+ eventCounter = 0;
+ long time = System.currentTimeMillis();
+ if (offset < 0) {
+ offset = time;
+ }
+ int value = (int) (time - offset);
+ times[ptr++] = value;
+ }
+
+ private void compact() {
+ for (int i = 1; i < ptr / 2; ++i) {
+ times[i] = times[i * 2];
+ }
+ resolution <<= 1;
+ ptr >>= 1;
+ }
+
+ public int getResolution() {
+ return resolution;
+ }
+
+ public int getCount() {
+ return ptr;
+ }
+
+ public int[] getSamples() {
+ return times;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/data/PointablePrimitiveValueProviderFactory.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/data/PointablePrimitiveValueProviderFactory.java
new file mode 100644
index 0000000..c159ba5
--- /dev/null
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/data/PointablePrimitiveValueProviderFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.common.data;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.api.INumeric;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.api.IPointableFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
+
+public class PointablePrimitiveValueProviderFactory implements IPrimitiveValueProviderFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final IPointableFactory pf;
+
+ public PointablePrimitiveValueProviderFactory(IPointableFactory pf) {
+ this.pf = pf;
+ }
+
+ @Override
+ public IPrimitiveValueProvider createPrimitiveValueProvider() {
+ final IPointable p = pf.createPointable();
+ ITypeTraits traits = pf.getTypeTraits();
+ assert traits.isFixedLength();
+ final int length = traits.getFixedLength();
+ return new IPrimitiveValueProvider() {
+ @Override
+ public double getValue(byte[] bytes, int offset) {
+ p.set(bytes, offset, length);
+ return ((INumeric) p).doubleValue();
+ }
+ };
+ }
+}
\ No newline at end of file