Merged fullstack_staging branch into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/hyracks/hyracks-api/pom.xml b/fullstack/hyracks/hyracks-api/pom.xml
new file mode 100644
index 0000000..7a9f7ca
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/pom.xml
@@ -0,0 +1,57 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-api</artifactId>
+ <name>hyracks-api</name>
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>20090211</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.1-alpha2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ <version>2.0.12</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>0.2.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.1</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
new file mode 100644
index 0000000..81cf511
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
+
+/**
+ * Base class of the {@link ICCApplicationContext} and the
+ * {@link INCApplicationContext}.
+ *
+ * @author vinayakb
+ *
+ */
+public interface IApplicationContext {
+ /**
+ * Provides the Class Loader that loads classes for this Hyracks Application
+ * at the CC.
+ *
+ * @return the application {@link ClassLoader}.
+ */
+ public ClassLoader getClassLoader();
+
+ /**
+ * Gets the distributed state that is made available to all the Application
+ * Contexts of this application in the cluster.
+ *
+ * @return
+ */
+ public Serializable getDistributedState();
+
+ public void setMessageBroker(IMessageBroker messageBroker);
+
+ public IMessageBroker getMessageBroker();
+
+ public String getApplicationName();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
new file mode 100644
index 0000000..19f97b4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+/**
+ * Base class of {@link ICCBootstrap} and {@link INCBootstrap}.
+ *
+ * @author vinayakb
+ */
+public interface IBootstrap {
+ /**
+ * Method called to start the application at a Hyracks CC or NC node.
+ *
+ * @throws Exception
+ */
+ public void start() throws Exception;
+
+ /**
+ * Method called to shutdown the application at a Hyracks CC or NC node.
+ *
+ * @throws Exception
+ */
+ public void stop() throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
new file mode 100644
index 0000000..2792d29
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.ICCContext;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+
+/**
+ * Application Context at the Cluster Controller for an application.
+ *
+ * @author vinayakb
+ */
+public interface ICCApplicationContext extends IApplicationContext {
+ /**
+ * Sets the state that must be distributed by the infrastructure to all the
+ * NC application contects. Any state set by calling thsi method in the {@link ICCBootstrap#start()} call is made available to all the {@link INCApplicationContext} objects at each Node Controller. The state
+ * is then available to be inspected by the application at the NC during or
+ * after the {@link INCBootstrap#start()} call.
+ *
+ * @param state
+ * The distributed state
+ */
+ public void setDistributedState(Serializable state);
+
+ /**
+ * A listener that listens to Job Lifecycle events at the Cluster
+ * Controller.
+ *
+ * @param jobLifecycleListener
+ */
+ public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+
+ /**
+ * Get the Cluster Controller Context.
+ *
+ * @return The Cluster Controller Context.
+ */
+ public ICCContext getCCContext();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java
new file mode 100644
index 0000000..e3906ea
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.api.application;
+
+/**
+ * Implemented by the bootstrap class of the application that will manage its
+ * life cycle at the Cluster Controller.
+ *
+ * @author vinayakb
+ *
+ */
+public interface ICCBootstrap extends IBootstrap {
+ /**
+ * Called by the infrastructure to set the CC Application Context for the
+ * application. The infrastructure makes this call prior to calling start().
+ *
+ * @param appCtx
+ * - The CC application context
+ */
+ public void setApplicationContext(ICCApplicationContext appCtx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
new file mode 100644
index 0000000..24a598b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+
+/**
+ * Application Context at the Node Controller for an application.
+ *
+ * @author vinayakb
+ *
+ */
+public interface INCApplicationContext extends IApplicationContext {
+ /**
+ * Gets the node Id of the Node Congtroller.
+ *
+ * @return the Node Id.
+ */
+ public String getNodeId();
+
+ /**
+ * Get the Hyracks Root Context.
+ *
+ * @return The Hyracks Root Context
+ */
+ public IHyracksRootContext getRootContext();
+
+ /**
+ * Set an object that can be later retrieved by the
+ * {@link #getApplicationObject()} call.
+ *
+ * @param object
+ * Application Object
+ */
+ public void setApplicationObject(Object object);
+
+ /**
+ * Get the application object previously set by the
+ * {@link #setApplicationObject(Object)} call.
+ *
+ * @return Application Object
+ */
+ public Object getApplicationObject();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java
new file mode 100644
index 0000000..300f7c7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.api.application;
+
+/**
+ * Implemented by the bootstrap class of the application that will manage its
+ * life cycle at a Node Controller.
+ *
+ * @author vinayakb
+ *
+ */
+public interface INCBootstrap extends IBootstrap {
+ /**
+ * Called by the infrastructure to set the NC Application Context for the
+ * application. The infrastructure makes this call prior to calling start().
+ *
+ * @param appCtx
+ * - The NC application context
+ */
+ public void setApplicationContext(INCApplicationContext appCtx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
new file mode 100644
index 0000000..a8f2fda
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.channels;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputChannel {
+ public void registerMonitor(IInputChannelMonitor monitor);
+
+ public void setAttachment(Object attachment);
+
+ public Object getAttachment();
+
+ public ByteBuffer getNextBuffer();
+
+ public void recycleBuffer(ByteBuffer buffer);
+
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException;
+
+ public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
new file mode 100644
index 0000000..065f4c5
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.channels;
+
+public interface IInputChannelMonitor {
+ public void notifyFailure(IInputChannel channel);
+
+ public void notifyDataAvailability(IInputChannel channel, int nFrames);
+
+ public void notifyEndOfStream(IInputChannel channel);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/ClusterControllerInfo.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/ClusterControllerInfo.java
new file mode 100644
index 0000000..8dd4d6c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/ClusterControllerInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+
+public class ClusterControllerInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String clientNetAddress;
+
+ private final int clientNetPort;
+
+ private final int webPort;
+
+ public ClusterControllerInfo(String clientNetAddress, int clientNetPort, int webPort) {
+ this.clientNetAddress = clientNetAddress;
+ this.clientNetPort = clientNetPort;
+ this.webPort = webPort;
+ }
+
+ public int getWebPort() {
+ return webPort;
+ }
+
+ public String getClientNetAddress() {
+ return clientNetAddress;
+ }
+
+ public int getClientNetPort() {
+ return clientNetPort;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
new file mode 100644
index 0000000..e34e60d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class HyracksClientInterfaceFunctions {
+ public enum FunctionId {
+ GET_CLUSTER_CONTROLLER_INFO,
+ GET_CLUSTER_TOPOLOGY,
+ CREATE_APPLICATION,
+ START_APPLICATION,
+ DESTROY_APPLICATION,
+ CREATE_JOB,
+ GET_JOB_STATUS,
+ START_JOB,
+ WAIT_FOR_COMPLETION,
+ GET_NODE_CONTROLLERS_INFO
+ }
+
+ public abstract static class Function implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public abstract FunctionId getFunctionId();
+ }
+
+ public static class GetClusterControllerInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_CLUSTER_CONTROLLER_INFO;
+ }
+ }
+
+ public static class CreateApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public CreateApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.CREATE_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class StartApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public StartApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class DestroyApplicationFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+
+ public DestroyApplicationFunction(String appName) {
+ this.appName = appName;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.DESTROY_APPLICATION;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+ }
+
+ public static class GetJobStatusFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public GetJobStatusFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_JOB_STATUS;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class StartJobFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final String appName;
+ private final byte[] acggfBytes;
+ private final EnumSet<JobFlag> jobFlags;
+
+ public StartJobFunction(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
+ this.appName = appName;
+ this.acggfBytes = acggfBytes;
+ this.jobFlags = jobFlags;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.START_JOB;
+ }
+
+ public String getAppName() {
+ return appName;
+ }
+
+ public byte[] getACGGFBytes() {
+ return acggfBytes;
+ }
+
+ public EnumSet<JobFlag> getJobFlags() {
+ return jobFlags;
+ }
+ }
+
+ public static class WaitForCompletionFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public WaitForCompletionFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.WAIT_FOR_COMPLETION;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
+ public static class GetNodeControllersInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_NODE_CONTROLLERS_INFO;
+ }
+ }
+
+ public static class GetClusterTopologyFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_CLUSTER_TOPOLOGY;
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
new file mode 100644
index 0000000..4c06d42
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+
+public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
+ private final IIPCHandle ipcHandle;
+
+ private final RPCInterface rpci;
+
+ public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) {
+ this.ipcHandle = ipcHandle;
+ this.rpci = rpci;
+ }
+
+ @Override
+ public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+ HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+ return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
+ }
+
+ @Override
+ public void createApplication(String appName) throws Exception {
+ HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
+ appName);
+ rpci.call(ipcHandle, caf);
+ }
+
+ @Override
+ public void startApplication(String appName) throws Exception {
+ HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
+ appName);
+ rpci.call(ipcHandle, saf);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
+ appName);
+ rpci.call(ipcHandle, daf);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
+ jobId);
+ return (JobStatus) rpci.call(ipcHandle, gjsf);
+ }
+
+ @Override
+ public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
+ HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+ appName, acggfBytes, jobFlags);
+ return (JobId) rpci.call(ipcHandle, sjf);
+ }
+
+ @Override
+ public void waitForCompletion(JobId jobId) throws Exception {
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
+ jobId);
+ rpci.call(ipcHandle, wfcf);
+ }
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+ HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+ return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
+ }
+
+ @Override
+ public ClusterTopology getClusterTopology() throws Exception {
+ HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = new HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
+ return (ClusterTopology) rpci.call(ipcHandle, gctf);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
new file mode 100644
index 0000000..227524c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.FileEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import edu.uci.ics.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+/**
+ * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
+ * Controller.
+ *
+ * @author vinayakb
+ */
+public final class HyracksConnection implements IHyracksClientConnection {
+ private final String ccHost;
+
+ private final IPCSystem ipc;
+
+ private final IHyracksClientInterface hci;
+
+ private final ClusterControllerInfo ccInfo;
+
+ /**
+ * Constructor to create a connection to the Hyracks Cluster Controller.
+ *
+ * @param ccHost
+ * Host name (or IP Address) where the Cluster Controller can be
+ * reached.
+ * @param ccPort
+ * Port to reach the Hyracks Cluster Controller at the specified
+ * host name.
+ * @throws Exception
+ */
+ public HyracksConnection(String ccHost, int ccPort) throws Exception {
+ this.ccHost = ccHost;
+ RPCInterface rpci = new RPCInterface();
+ ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
+ ipc.start();
+ IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
+ this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
+ ccInfo = hci.getClusterControllerInfo();
+ }
+
+ @Override
+ public void createApplication(String appName, File harFile) throws Exception {
+ hci.createApplication(appName);
+ if (harFile != null) {
+ HttpClient hc = new DefaultHttpClient();
+ HttpPut put = new HttpPut("http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/" + appName);
+ put.setEntity(new FileEntity(harFile, "application/octet-stream"));
+ HttpResponse response = hc.execute(put);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ hci.destroyApplication(appName);
+ throw new HyracksException(response.getStatusLine().toString());
+ }
+ }
+ hci.startApplication(appName);
+ }
+
+ @Override
+ public void destroyApplication(String appName) throws Exception {
+ hci.destroyApplication(appName);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobId jobId) throws Exception {
+ return hci.getJobStatus(jobId);
+ }
+
+ @Override
+ public JobId startJob(String appName, JobSpecification jobSpec) throws Exception {
+ return startJob(appName, jobSpec, EnumSet.noneOf(JobFlag.class));
+ }
+
+ @Override
+ public JobId startJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
+ jobSpec);
+ return startJob(appName, jsacggf, jobFlags);
+ }
+
+ public JobId startJob(String appName, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+ throws Exception {
+ return hci.startJob(appName, JavaSerializationUtils.serialize(acggf), jobFlags);
+ }
+
+ @Override
+ public void waitForCompletion(JobId jobId) throws Exception {
+ hci.waitForCompletion(jobId);
+ }
+
+ @Override
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+ return hci.getNodeControllersInfo();
+ }
+
+ @Override
+ public ClusterTopology getClusterTopology() throws Exception {
+ return hci.getClusterTopology();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
new file mode 100644
index 0000000..bdbb544
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.File;
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+
+/**
+ * Interface used by clients to communicate with the Hyracks Cluster Controller.
+ *
+ * @author vinayakb
+ */
+public interface IHyracksClientConnection {
+ /**
+ * Create a Hyracks Application
+ *
+ * @param appName
+ * Name of the application
+ * @param harFile
+ * Archive that contains deployable code for the application
+ * @throws Exception
+ */
+ public void createApplication(String appName, File harFile) throws Exception;
+
+ /**
+ * Destroy an already-deployed Hyracks application
+ *
+ * @param appName
+ * Name of the application
+ * @throws Exception
+ */
+ public void destroyApplication(String appName) throws Exception;
+
+ /**
+ * Gets the status of the specified Job.
+ *
+ * @param jobId
+ * JobId of the Job
+ * @return {@link JobStatus}
+ * @throws Exception
+ */
+ public JobStatus getJobStatus(JobId jobId) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param appName
+ * Name of the application
+ * @param jobSpec
+ * Job Specification
+ * @throws Exception
+ */
+ public JobId startJob(String appName, JobSpecification jobSpec) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param appName
+ * Name of the application
+ * @param jobSpec
+ * Job Specification
+ * @param jobFlags
+ * Flags
+ * @throws Exception
+ */
+ public JobId startJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+ /**
+ * Start the specified Job.
+ *
+ * @param appName
+ * Name of the application
+ * @param acggf
+ * Activity Cluster Graph Generator Factory
+ * @param jobFlags
+ * Flags
+ * @throws Exception
+ */
+ public JobId startJob(String appName, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+ throws Exception;
+
+ /**
+ * Waits until the specified job has completed, either successfully or has
+ * encountered a permanent failure.
+ *
+ * @param jobId
+ * JobId of the Job
+ * @throws Exception
+ */
+ public void waitForCompletion(JobId jobId) throws Exception;
+
+ /**
+ * Gets a map of node controller names to node information.
+ *
+ * @return Map of node name to node information.
+ */
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
+
+ /**
+ * Get the cluster topology
+ *
+ * @return the cluster topology
+ * @throws Exception
+ */
+ public ClusterTopology getClusterTopology() throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
new file mode 100644
index 0000000..ef5906e
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+
+public interface IHyracksClientInterface {
+ public ClusterControllerInfo getClusterControllerInfo() throws Exception;
+
+ public void createApplication(String appName) throws Exception;
+
+ public void startApplication(String appName) throws Exception;
+
+ public void destroyApplication(String appName) throws Exception;
+
+ public JobStatus getJobStatus(JobId jobId) throws Exception;
+
+ public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+
+ public void waitForCompletion(JobId jobId) throws Exception;
+
+ public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
+
+ public ClusterTopology getClusterTopology() throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
new file mode 100644
index 0000000..fd9218a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+
+public class NodeControllerInfo implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String nodeId;
+
+ private final NodeStatus status;
+
+ private final NetworkAddress netAddress;
+
+ public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress) {
+ this.nodeId = nodeId;
+ this.status = status;
+ this.netAddress = netAddress;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public NodeStatus getStatus() {
+ return status;
+ }
+
+ public NetworkAddress getNetworkAddress() {
+ return netAddress;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java
new file mode 100644
index 0000000..d6b99d0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+public enum NodeStatus {
+ ALIVE,
+ DEAD
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
new file mode 100644
index 0000000..6c6c211
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONException;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class ActivityClusterGraphBuilder {
+ private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
+
+ public ActivityClusterGraphBuilder() {
+ }
+
+ private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, Set<Set<ActivityId>> eqSets) {
+ for (Set<ActivityId> eqSet : eqSets) {
+ for (ActivityId t : eqSet) {
+ List<IConnectorDescriptor> inputList = jag.getActivityInputMap().get(t);
+ if (inputList != null) {
+ for (IConnectorDescriptor conn : inputList) {
+ ActivityId inTask = jag.getProducerActivity(conn.getConnectorId());
+ if (!eqSet.contains(inTask)) {
+ return Pair.<ActivityId, ActivityId> of(t, inTask);
+ }
+ }
+ }
+ List<IConnectorDescriptor> outputList = jag.getActivityOutputMap().get(t);
+ if (outputList != null) {
+ for (IConnectorDescriptor conn : outputList) {
+ ActivityId outTask = jag.getConsumerActivity(conn.getConnectorId());
+ if (!eqSet.contains(outTask)) {
+ return Pair.<ActivityId, ActivityId> of(t, outTask);
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public ActivityClusterGraph inferActivityClusters(JobId jobId, JobActivityGraph jag) {
+ /*
+ * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+ */
+ Map<ActivityId, Set<ActivityId>> stageMap = new HashMap<ActivityId, Set<ActivityId>>();
+ Set<Set<ActivityId>> stages = new HashSet<Set<ActivityId>>();
+ for (ActivityId taskId : jag.getActivityMap().keySet()) {
+ Set<ActivityId> eqSet = new HashSet<ActivityId>();
+ eqSet.add(taskId);
+ stageMap.put(taskId, eqSet);
+ stages.add(eqSet);
+ }
+
+ boolean changed = true;
+ while (changed) {
+ changed = false;
+ Pair<ActivityId, ActivityId> pair = findMergePair(jag, stages);
+ if (pair != null) {
+ merge(stageMap, stages, pair.getLeft(), pair.getRight());
+ changed = true;
+ }
+ }
+
+ ActivityClusterGraph acg = new ActivityClusterGraph();
+ Map<ActivityId, ActivityCluster> acMap = new HashMap<ActivityId, ActivityCluster>();
+ int acCounter = 0;
+ Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap();
+ List<ActivityCluster> acList = new ArrayList<ActivityCluster>();
+ for (Set<ActivityId> stage : stages) {
+ ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(jobId, acCounter++));
+ acList.add(ac);
+ for (ActivityId aid : stage) {
+ IActivity activity = activityNodeMap.get(aid);
+ ac.addActivity(activity);
+ acMap.put(aid, ac);
+ }
+ }
+
+ for (Set<ActivityId> stage : stages) {
+ for (ActivityId aid : stage) {
+ IActivity activity = activityNodeMap.get(aid);
+ ActivityCluster ac = acMap.get(aid);
+ List<IConnectorDescriptor> aOutputs = jag.getActivityOutputMap().get(aid);
+ if (aOutputs == null || aOutputs.isEmpty()) {
+ ac.addRoot(activity);
+ } else {
+ int nActivityOutputs = aOutputs.size();
+ for (int i = 0; i < nActivityOutputs; ++i) {
+ IConnectorDescriptor conn = aOutputs.get(i);
+ ac.addConnector(conn);
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> pcPair = jag.getConnectorActivityMap()
+ .get(conn.getConnectorId());
+ ac.connect(conn, activity, i, pcPair.getRight().getLeft(), pcPair.getRight().getRight(), jag
+ .getConnectorRecordDescriptorMap().get(conn.getConnectorId()));
+ }
+ }
+ }
+ }
+
+ Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = jag.getBlocked2BlockerMap();
+ for (ActivityCluster s : acList) {
+ Map<ActivityId, Set<ActivityId>> acBlocked2BlockerMap = s.getBlocked2BlockerMap();
+ Set<ActivityCluster> blockerStages = new HashSet<ActivityCluster>();
+ for (ActivityId t : s.getActivityMap().keySet()) {
+ Set<ActivityId> blockerTasks = blocked2BlockerMap.get(t);
+ acBlocked2BlockerMap.put(t, blockerTasks);
+ if (blockerTasks != null) {
+ for (ActivityId bt : blockerTasks) {
+ blockerStages.add(acMap.get(bt));
+ }
+ }
+ }
+ for (ActivityCluster bs : blockerStages) {
+ s.getDependencies().add(bs);
+ }
+ }
+ acg.addActivityClusters(acList);
+
+ if (LOGGER.isLoggable(Level.FINE)) {
+ try {
+ LOGGER.fine(acg.toJSON().toString(2));
+ } catch (JSONException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ return acg;
+ }
+
+ private void merge(Map<ActivityId, Set<ActivityId>> eqSetMap, Set<Set<ActivityId>> eqSets, ActivityId t1,
+ ActivityId t2) {
+ Set<ActivityId> stage1 = eqSetMap.get(t1);
+ Set<ActivityId> stage2 = eqSetMap.get(t2);
+
+ Set<ActivityId> mergedSet = new HashSet<ActivityId>();
+ mergedSet.addAll(stage1);
+ mergedSet.addAll(stage2);
+
+ eqSets.remove(stage1);
+ eqSets.remove(stage2);
+ eqSets.add(mergedSet);
+
+ for (ActivityId t : mergedSet) {
+ eqSetMap.put(t, mergedSet);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
new file mode 100644
index 0000000..5b508b3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IConnectorDescriptorVisitor {
+ public void visit(IConnectorDescriptor conn) throws HyracksException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
new file mode 100644
index 0000000..fcdf9f0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IOperatorDescriptorVisitor {
+ public void visit(IOperatorDescriptor op) throws HyracksException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
new file mode 100644
index 0000000..b4b9b3c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
@@ -0,0 +1,127 @@
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobActivityGraphBuilder implements IActivityGraphBuilder {
+ private static final Logger LOGGER = Logger.getLogger(JobActivityGraphBuilder.class.getName());
+
+ private final Map<ActivityId, IOperatorDescriptor> activityOperatorMap;
+
+ private final JobActivityGraph jag;
+
+ private final JobSpecification jobSpec;
+
+ private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorProducerMap;
+
+ private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorConsumerMap;
+
+ public JobActivityGraphBuilder(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+ activityOperatorMap = new HashMap<ActivityId, IOperatorDescriptor>();
+ jag = new JobActivityGraph();
+ this.jobSpec = jobSpec;
+ connectorProducerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
+ connectorConsumerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
+ }
+
+ public void addConnector(IConnectorDescriptor conn) {
+ jag.getConnectorMap().put(conn.getConnectorId(), conn);
+ jag.getConnectorRecordDescriptorMap().put(conn.getConnectorId(), jobSpec.getConnectorRecordDescriptor(conn));
+ }
+
+ @Override
+ public void addBlockingEdge(IActivity blocker, IActivity blocked) {
+ addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityId(), blocker.getActivityId());
+ }
+
+ @Override
+ public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Adding source edge: " + task.getActivityId() + ":" + operatorInputIndex + " -> "
+ + task.getActivityId() + ":" + taskInputIndex);
+ }
+ IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
+ IConnectorDescriptor conn = jobSpec.getInputConnectorDescriptor(op, operatorInputIndex);
+ insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, conn);
+ connectorConsumerMap.put(conn.getConnectorId(), Pair.of(task, taskInputIndex));
+ }
+
+ @Override
+ public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex) {
+ if (LOGGER.isLoggable(Level.FINEST)) {
+ LOGGER.finest("Adding target edge: " + task.getActivityId() + ":" + operatorOutputIndex + " -> "
+ + task.getActivityId() + ":" + taskOutputIndex);
+ }
+ IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
+ IConnectorDescriptor conn = jobSpec.getOutputConnectorDescriptor(op, operatorOutputIndex);
+ insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, conn);
+ connectorProducerMap.put(conn.getConnectorId(), Pair.of(task, taskOutputIndex));
+ }
+
+ @Override
+ public void addActivity(IOperatorDescriptor op, IActivity task) {
+ activityOperatorMap.put(task.getActivityId(), op);
+ ActivityId activityId = task.getActivityId();
+ jag.getActivityMap().put(activityId, task);
+ }
+
+ public void finish() {
+ Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> caMap = jag
+ .getConnectorActivityMap();
+ for (Map.Entry<ConnectorDescriptorId, Pair<IActivity, Integer>> e : connectorProducerMap.entrySet()) {
+ ConnectorDescriptorId cdId = e.getKey();
+ Pair<IActivity, Integer> producer = e.getValue();
+ Pair<IActivity, Integer> consumer = connectorConsumerMap.get(cdId);
+ caMap.put(cdId, Pair.of(producer, consumer));
+ }
+ }
+
+ private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
+ Set<V> targets = map.get(n1);
+ if (targets == null) {
+ targets = new HashSet<V>();
+ map.put(n1, targets);
+ }
+ targets.add(n2);
+ }
+
+ private <T> void extend(List<T> list, int index) {
+ int n = list.size();
+ for (int i = n; i <= index; ++i) {
+ list.add(null);
+ }
+ }
+
+ private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+ List<V> vList = map.get(key);
+ if (vList == null) {
+ vList = new ArrayList<V>();
+ map.put(key, vList);
+ }
+ extend(vList, index);
+ vList.set(index, value);
+ }
+
+ public JobActivityGraph getActivityGraph() {
+ return jag;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
new file mode 100644
index 0000000..f36b7b3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobSpecificationActivityClusterGraphGeneratorFactory implements IActivityClusterGraphGeneratorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final JobSpecification spec;
+
+ public JobSpecificationActivityClusterGraphGeneratorFactory(JobSpecification jobSpec) {
+ this.spec = jobSpec;
+ }
+
+ @Override
+ public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(String appName, JobId jobId,
+ final ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException {
+ final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
+ PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+ @Override
+ public void visit(IConnectorDescriptor conn) throws HyracksException {
+ builder.addConnector(conn);
+ }
+ });
+ PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) {
+ op.contributeActivities(builder);
+ }
+ });
+ builder.finish();
+ final JobActivityGraph jag = builder.getActivityGraph();
+ ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
+
+ final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, jag);
+ acg.setFrameSize(spec.getFrameSize());
+ acg.setMaxReattempts(spec.getMaxReattempts());
+ acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
+ acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
+ acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
+ acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
+ final Set<Constraint> constraints = new HashSet<Constraint>();
+ final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
+ @Override
+ public void addConstraint(Constraint constraint) {
+ constraints.add(constraint);
+ }
+ };
+ PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) {
+ op.contributeSchedulingConstraints(acceptor, ccAppCtx);
+ }
+ });
+ PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+ @Override
+ public void visit(IConnectorDescriptor conn) {
+ conn.contributeSchedulingConstraints(acceptor, acg.getConnectorMap().get(conn.getConnectorId()),
+ ccAppCtx);
+ }
+ });
+ constraints.addAll(spec.getUserConstraints());
+ return new IActivityClusterGraphGenerator() {
+ @Override
+ public ActivityClusterGraph initialize() {
+ return acg;
+ }
+
+ @Override
+ public Set<Constraint> getConstraints() {
+ return constraints;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
new file mode 100644
index 0000000..30f26c7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class PlanUtils {
+ public static void visit(JobSpecification spec, IOperatorDescriptorVisitor visitor) throws HyracksException {
+ Set<OperatorDescriptorId> seen = new HashSet<OperatorDescriptorId>();
+ for (IOperatorDescriptor op : spec.getOperatorMap().values()) {
+ visitOperator(visitor, seen, op);
+ }
+ }
+
+ private static void visitOperator(IOperatorDescriptorVisitor visitor, Set<OperatorDescriptorId> seen,
+ IOperatorDescriptor op) throws HyracksException {
+ if (!seen.contains(op)) {
+ visitor.visit(op);
+ }
+ seen.add(op.getOperatorId());
+ }
+
+ public static void visit(JobSpecification spec, IConnectorDescriptorVisitor visitor) throws HyracksException {
+ for (IConnectorDescriptor c : spec.getConnectorMap().values()) {
+ visitor.visit(c);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
new file mode 100644
index 0000000..cc76bd4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+public interface FrameConstants {
+ public static final int SIZE_LEN = 4;
+
+ public static final boolean DEBUG_FRAME_IO = false;
+
+ public static final int FRAME_FIELD_MAGIC = 0x12345678;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
new file mode 100644
index 0000000..c8478d6
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+public class FrameHelper {
+ public static int getTupleCountOffset(int frameSize) {
+ return frameSize - 4;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
new file mode 100644
index 0000000..0188578
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameReader {
+ public void open() throws HyracksDataException;
+
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException;
+
+ public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
new file mode 100644
index 0000000..8b82728
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+public interface IFrameTupleAccessor {
+ public int getFieldCount();
+
+ public int getFieldSlotsLength();
+
+ public int getFieldEndOffset(int tupleIndex, int fIdx);
+
+ public int getFieldStartOffset(int tupleIndex, int fIdx);
+
+ public int getFieldLength(int tupleIndex, int fIdx);
+
+ public int getTupleEndOffset(int tupleIndex);
+
+ public int getTupleStartOffset(int tupleIndex);
+
+ public int getTupleCount();
+
+ public ByteBuffer getBuffer();
+
+ public void reset(ByteBuffer buffer);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
new file mode 100644
index 0000000..a85bd7e
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * {@link IFrameWriter} is the interface implemented by a stream consumer. An
+ * {@link IFrameWriter} could be in one of the following states:
+ * <ul>
+ * <li>INITIAL</li>
+ * <li>OPENED</li>
+ * <li>CLOSED</li>
+ * <li>FAILED</li>
+ * </ul>
+ * A producer follows the following protocol when using an {@link IFrameWriter}.
+ * Initially, the {@link IFrameWriter} is in the INITIAL state.
+ * The first valid call to an {@link IFrameWriter} is always the
+ * {@link IFrameWriter#open()}. This call provides the opportunity for the
+ * {@link IFrameWriter} implementation to allocate any resources for its
+ * processing. Once this call returns, the {@link IFrameWriter} is in the OPENED
+ * state. If an error occurs
+ * during the {@link IFrameWriter#open()} call, a {@link HyracksDataException}
+ * is thrown and it stays in the INITIAL state.
+ * While the {@link IFrameWriter} is in the OPENED state, the producer can call
+ * one of:
+ * <ul>
+ * <li> {@link IFrameWriter#close()} to give up any resources owned by the
+ * {@link IFrameWriter} and enter the CLOSED state.</li>
+ * <li> {@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the
+ * {@link IFrameWriter}. The call returns normally on success and the
+ * {@link IFrameWriter} remains in the OPENED state. On failure, the call throws
+ * a {@link HyracksDataException}, and the {@link IFrameWriter} enters the ERROR
+ * state.</li>
+ * <li> {@link IFrameWriter#fail()} to indicate that stream is to be aborted. The
+ * {@link IFrameWriter} enters the FAILED state.</li>
+ * </ul>
+ * In the FAILED state, the only call allowed is the
+ * {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
+ * state and give up all resources.
+ * No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
+ *
+ * Note: If the call to {@link IFrameWriter#open()} failed, the
+ * {@link IFrameWriter#close()} is not called by the producer. So an exceptional
+ * return from the {@link IFrameWriter#open()} call must clean up all partially
+ * allocated resources.
+ *
+ * @author vinayakb
+ */
+public interface IFrameWriter {
+ /**
+ * First call to allocate any resources.
+ */
+ public void open() throws HyracksDataException;
+
+ /**
+ * Provide data to the stream of this {@link IFrameWriter}.
+ *
+ * @param buffer
+ * - Buffer containing data.
+ * @throws HyracksDataException
+ */
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
+
+ /**
+ * Indicate that a failure was encountered and the current stream is to be
+ * aborted.
+ *
+ * @throws HyracksDataException
+ */
+ public void fail() throws HyracksDataException;
+
+ /**
+ * Close this {@link IFrameWriter} and give up all resources.
+ *
+ * @throws HyracksDataException
+ */
+ public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java
new file mode 100644
index 0000000..89e6da3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.util.Collection;
+
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public interface IPartitionCollector {
+ public JobId getJobId();
+
+ public ConnectorDescriptorId getConnectorId();
+
+ public int getReceiverIndex();
+
+ public void open() throws HyracksException;
+
+ public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException;
+
+ public IFrameReader getReader() throws HyracksException;
+
+ public void close() throws HyracksException;
+
+ public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException;
+
+ public void abort();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionWriterFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionWriterFactory.java
new file mode 100644
index 0000000..e15ec6d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IPartitionWriterFactory {
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
new file mode 100644
index 0000000..563fdc1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public final class NetworkAddress implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] ipAddress;
+
+ private final int port;
+
+ public NetworkAddress(byte[] ipAddress, int port) {
+ this.ipAddress = ipAddress;
+ this.port = port;
+ }
+
+ public byte[] getIpAddress() {
+ return ipAddress;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(ipAddress) + ":" + port;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(ipAddress) + port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof NetworkAddress)) {
+ return false;
+ }
+ NetworkAddress on = (NetworkAddress) o;
+ return on.port == port && Arrays.equals(on.ipAddress, ipAddress);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/PartitionChannel.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/PartitionChannel.java
new file mode 100644
index 0000000..b75ed07
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/PartitionChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionChannel {
+ private PartitionId partitionId;
+
+ private IInputChannel channel;
+
+ public PartitionChannel(PartitionId partitionId, IInputChannel channel) {
+ this.partitionId = partitionId;
+ this.channel = channel;
+ }
+
+ public PartitionId getPartitionId() {
+ return partitionId;
+ }
+
+ public IInputChannel getInputChannel() {
+ return channel;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/Constraint.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/Constraint.java
new file mode 100644
index 0000000..0668e00
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/Constraint.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+
+public class Constraint implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final LValueConstraintExpression lValue;
+
+ private final ConstraintExpression rValue;
+
+ public Constraint(LValueConstraintExpression lValue, ConstraintExpression rValue) {
+ this.lValue = lValue;
+ this.rValue = rValue;
+ }
+
+ public LValueConstraintExpression getLValue() {
+ return lValue;
+ }
+
+ public ConstraintExpression getRValue() {
+ return rValue;
+ }
+
+ @Override
+ public String toString() {
+ return lValue + " in " + rValue;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/IConstraintAcceptor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/IConstraintAcceptor.java
new file mode 100644
index 0000000..2e41b0a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/IConstraintAcceptor.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public interface IConstraintAcceptor {
+ public void addConstraint(Constraint constraint);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraintHelper.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraintHelper.java
new file mode 100644
index 0000000..0e4ca69
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraintHelper.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class PartitionConstraintHelper {
+ public static void addPartitionCountConstraint(JobSpecification spec, IOperatorDescriptor op, int count) {
+ spec.addUserConstraint(new Constraint(new PartitionCountExpression(op.getOperatorId()), new ConstantExpression(
+ count)));
+ }
+
+ public static void addLocationChoiceConstraint(JobSpecification spec, IOperatorDescriptor op, String[][] choices) {
+ addPartitionCountConstraint(spec, op, choices.length);
+ for (int i = 0; i < choices.length; ++i) {
+ spec.addUserConstraint(new Constraint(new PartitionLocationExpression(op.getOperatorId(), i),
+ new ConstantExpression(choices[i])));
+ }
+ }
+
+ public static void addAbsoluteLocationConstraint(JobSpecification spec, IOperatorDescriptor op, String... locations) {
+ addPartitionCountConstraint(spec, op, locations.length);
+ for (int i = 0; i < locations.length; ++i) {
+ spec.addUserConstraint(new Constraint(new PartitionLocationExpression(op.getOperatorId(), i),
+ new ConstantExpression(locations[i])));
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstantExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstantExpression.java
new file mode 100644
index 0000000..6e10911
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstantExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+import java.util.Collection;
+
+public class ConstantExpression extends ConstraintExpression {
+ private static final long serialVersionUID = 1L;
+
+ private final Object value;
+
+ public ConstantExpression(Object value) {
+ this.value = value;
+ }
+
+ @Override
+ public ExpressionTag getTag() {
+ return ExpressionTag.CONSTANT;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ @Override
+ public void getChildren(Collection<ConstraintExpression> children) {
+ }
+
+ @Override
+ protected void toString(StringBuilder buffer) {
+ buffer.append(getTag()).append('[').append(value).append(':').append(value.getClass().getName()).append(']');
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstraintExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstraintExpression.java
new file mode 100644
index 0000000..811c048
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstraintExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+public abstract class ConstraintExpression implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public enum ExpressionTag {
+ CONSTANT,
+ PARTITION_COUNT,
+ PARTITION_LOCATION,
+ }
+
+ public abstract ExpressionTag getTag();
+
+ public abstract void getChildren(Collection<ConstraintExpression> children);
+
+ @Override
+ public final String toString() {
+ StringBuilder buffer = new StringBuilder();
+ toString(buffer);
+ return buffer.toString();
+ }
+
+ protected abstract void toString(StringBuilder buffer);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/LValueConstraintExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/LValueConstraintExpression.java
new file mode 100644
index 0000000..7557460
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/LValueConstraintExpression.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+public abstract class LValueConstraintExpression extends ConstraintExpression {
+ private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionCountExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionCountExpression.java
new file mode 100644
index 0000000..dc73947
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionCountExpression.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+import java.util.Collection;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public final class PartitionCountExpression extends LValueConstraintExpression {
+ private static final long serialVersionUID = 1L;
+
+ private final OperatorDescriptorId opId;
+
+ public PartitionCountExpression(OperatorDescriptorId opId) {
+ this.opId = opId;
+ }
+
+ @Override
+ public ExpressionTag getTag() {
+ return ExpressionTag.PARTITION_COUNT;
+ }
+
+ public OperatorDescriptorId getOperatorDescriptorId() {
+ return opId;
+ }
+
+ @Override
+ public void getChildren(Collection<ConstraintExpression> children) {
+ }
+
+ @Override
+ protected void toString(StringBuilder buffer) {
+ buffer.append(getTag()).append('(').append(opId.toString()).append(')');
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((opId == null) ? 0 : opId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PartitionCountExpression other = (PartitionCountExpression) obj;
+ if (opId == null) {
+ if (other.opId != null)
+ return false;
+ } else if (!opId.equals(other.opId))
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionLocationExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionLocationExpression.java
new file mode 100644
index 0000000..fbfdc1c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionLocationExpression.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+import java.util.Collection;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public final class PartitionLocationExpression extends LValueConstraintExpression {
+ private static final long serialVersionUID = 1L;
+
+ private final OperatorDescriptorId opId;
+ private final int partition;
+
+ public PartitionLocationExpression(OperatorDescriptorId opId, int partition) {
+ this.opId = opId;
+ this.partition = partition;
+ }
+
+ @Override
+ public ExpressionTag getTag() {
+ return ExpressionTag.PARTITION_LOCATION;
+ }
+
+ public OperatorDescriptorId getOperatorDescriptorId() {
+ return opId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ @Override
+ public void getChildren(Collection<ConstraintExpression> children) {
+ }
+
+ @Override
+ protected void toString(StringBuilder buffer) {
+ buffer.append(getTag()).append('(').append(opId.toString()).append(", ").append(partition).append(')');
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((opId == null) ? 0 : opId.hashCode());
+ result = prime * result + partition;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PartitionLocationExpression other = (PartitionLocationExpression) obj;
+ if (opId == null) {
+ if (other.opId != null)
+ return false;
+ } else if (!opId.equals(other.opId))
+ return false;
+ if (partition != other.partition)
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
new file mode 100644
index 0000000..1e49fe2
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+
+public interface ICCContext {
+ public ClusterControllerInfo getClusterControllerInfo();
+
+ public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception;
+
+ public ClusterTopology getClusterTopology();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
new file mode 100644
index 0000000..9cb8f10
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public interface IHyracksCommonContext {
+ public int getFrameSize();
+
+ public IIOManager getIOManager();
+
+ public ByteBuffer allocateFrame();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
new file mode 100644
index 0000000..fad4300
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry {
+ public INCApplicationContext getApplicationContext();
+
+ public JobId getJobId();
+
+ public ICounterContext getCounterContext();
+
+ public Object getGlobalJobData();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
new file mode 100644
index 0000000..c4989c5
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public interface IHyracksRootContext {
+ public IIOManager getIOManager();
+
+ public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
new file mode 100644
index 0000000..e964d66
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public interface IHyracksTaskContext extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry,
+ IOperatorEnvironment {
+ public IHyracksJobletContext getJobletContext();
+
+ public TaskAttemptId getTaskAttemptId();
+
+ public ICounterContext getCounterContext();
+
+ public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
new file mode 100644
index 0000000..41b4c23
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class ActivityId implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final OperatorDescriptorId odId;
+ private final int id;
+
+ public ActivityId(OperatorDescriptorId odId, int id) {
+ this.odId = odId;
+ this.id = id;
+ }
+
+ public OperatorDescriptorId getOperatorDescriptorId() {
+ return odId;
+ }
+
+ public int getLocalId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (odId.hashCode() + id);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ActivityId)) {
+ return false;
+ }
+ ActivityId other = (ActivityId) o;
+ return other.odId.equals(odId) && other.id == id;
+ }
+
+ public String toString() {
+ return "ANID:" + odId + ":" + id;
+ }
+
+ public static ActivityId parse(String str) {
+ if (str.startsWith("ANID:")) {
+ str = str.substring(5);
+ int idIdx = str.lastIndexOf(':');
+ return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)), Integer.parseInt(str
+ .substring(idIdx + 1)));
+ }
+ throw new IllegalArgumentException("Unable to parse: " + str);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
new file mode 100644
index 0000000..3133a7f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class ConnectorDescriptorId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private int id;
+
+ public ConnectorDescriptorId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof ConnectorDescriptorId)) {
+ return false;
+ }
+ ConnectorDescriptorId other = (ConnectorDescriptorId) obj;
+ return id == other.id;
+ }
+
+ @Override
+ public String toString() {
+ return "CDID:" + id;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
new file mode 100644
index 0000000..df8fd53
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IActivity extends Serializable {
+ public ActivityId getActivityId();
+
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
new file mode 100644
index 0000000..bbda23c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+public interface IActivityGraphBuilder {
+ public void addActivity(IOperatorDescriptor op, IActivity task);
+
+ public void addBlockingEdge(IActivity blocker, IActivity blocked);
+
+ public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex);
+
+ public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
new file mode 100644
index 0000000..3a6ee7a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+
+/**
+ * Connector that connects operators in a Job.
+ *
+ * @author vinayakb
+ */
+public interface IConnectorDescriptor extends Serializable {
+ /**
+ * Gets the id of the connector.
+ *
+ * @return
+ */
+ public ConnectorDescriptorId getConnectorId();
+
+ /**
+ * Factory method to create the send side writer that writes into this
+ * connector.
+ *
+ * @param ctx
+ * Context
+ * @param recordDesc
+ * Record Descriptor
+ * @param edwFactory
+ * Endpoint writer factory.
+ * @param index
+ * ordinal index of the data producer partition.
+ * @param nProducerPartitions
+ * Number of partitions of the producing operator.
+ * @param nConsumerPartitions
+ * Number of partitions of the consuming operator.
+ * @return data writer.
+ * @throws Exception
+ */
+ public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+ IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException;
+
+ /**
+ * Factory metod to create the receive side reader that reads data from this
+ * connector.
+ *
+ * @param ctx
+ * Context
+ * @param recordDesc
+ * Job plan
+ * @param receiverIndex
+ * ordinal index of the data consumer partition
+ * @param nProducerPartitions
+ * Number of partitions of the producing operator.
+ * @param nConsumerPartitions
+ * Number of partitions of the consuming operator.
+ * @return partition collector
+ * @throws HyracksDataException
+ */
+ public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+ int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
+
+ /**
+ * Contribute any scheduling constraints imposed by this connector
+ *
+ * @param constraintAcceptor
+ * - Constraint Acceptor
+ * @param ac
+ * - Activity Cluster
+ */
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
+ ICCApplicationContext appCtx);
+
+ /**
+ * Indicate which consumer partitions may receive data from the given
+ * producer partition.
+ */
+ public void indicateTargetPartitions(int nProducerPartitions, int nConsumerPartitions, int producerIndex,
+ BitSet targetBitmap);
+
+ /**
+ * Indicate which producer partitions are required for the given receiver.
+ */
+ public void indicateSourcePartitions(int nProducerPartitions, int nConsumerPartitions, int consumerIndex,
+ BitSet sourceBitmap);
+
+ /**
+ * Gets the display name.
+ */
+ public String getDisplayName();
+
+ /**
+ * Sets the display name.
+ */
+ public void setDisplayName(String displayName);
+
+ /**
+ * Translate this connector descriptor to JSON.
+ *
+ * @return
+ * @throws JSONException
+ */
+ public JSONObject toJSON() throws JSONException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataReader.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataReader.java
new file mode 100644
index 0000000..b5f45cb
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataReader.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Provides data to a consumer.
+ *
+ * @author vinayakb
+ */
+public interface IDataReader<T> {
+ /**
+ * Reads one data item.
+ *
+ * @return data. <code>null</code> indicates end of stream.
+ * @throws Exception
+ */
+ public T readData() throws HyracksDataException;
+
+ /**
+ * Closes this reader.
+ *
+ * @throws Exception
+ */
+ public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
new file mode 100644
index 0000000..4fb6f18
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Accepts data from data producers.
+ *
+ * @author vinayakb
+ */
+public interface IDataWriter<T> {
+ /**
+ * Pushes data to the acceptor.
+ *
+ * @param data
+ * - Data pushed to the acceptor. <code>null</code> indicates the
+ * end of stream.
+ * @throws HyracksDataException
+ */
+ public void writeData(T data) throws HyracksDataException;
+
+ /**
+ * Indicates that the stream has failed.
+ *
+ * @throws HyracksDataException
+ */
+ public void fail() throws HyracksDataException;
+
+ /**
+ * Closes this writer.
+ *
+ * @throws Exception
+ */
+ public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataReader.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataReader.java
new file mode 100644
index 0000000..d0b6474
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataReader.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IOpenableDataReader<T> extends IDataReader<T> {
+ public void open() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataWriter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataWriter.java
new file mode 100644
index 0000000..b2b15c6
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataWriter.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IOpenableDataWriter<T> extends IDataWriter<T> {
+ public void open() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
new file mode 100644
index 0000000..43bf141
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+/**
+ * Descriptor for operators in Hyracks.
+ *
+ * @author vinayakb
+ */
+public interface IOperatorDescriptor extends Serializable {
+ /**
+ * Returns the id of the operator.
+ *
+ * @return operator id
+ */
+ public OperatorDescriptorId getOperatorId();
+
+ /**
+ * Returns the number of inputs into this operator.
+ *
+ * @return Number of inputs.
+ */
+ public int getInputArity();
+
+ /**
+ * Returns the number of outputs out of this operator.
+ *
+ * @return Number of outputs.
+ */
+ public int getOutputArity();
+
+ /**
+ * Gets the output record descriptor
+ *
+ * @return Array of RecordDescriptor, one per output.
+ */
+ public RecordDescriptor[] getOutputRecordDescriptors();
+
+ /**
+ * Contributes the activity graph that describes the behavior of this
+ * operator.
+ *
+ * @param builder
+ * - graph builder
+ */
+ public void contributeActivities(IActivityGraphBuilder builder);
+
+ /**
+ * Contributes any scheduling constraints imposed by this operator.
+ *
+ * @param constraintAcceptor
+ * - Constraint Acceptor
+ * @param plan
+ * - Job Plan
+ */
+ public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx);
+
+ /**
+ * Gets the display name.
+ */
+ public String getDisplayName();
+
+ /**
+ * Sets the display name.
+ */
+ public void setDisplayName(String displayName);
+
+ /**
+ * Translates this operator descriptor to JSON.
+ */
+ public JSONObject toJSON() throws JSONException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePullable.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePullable.java
new file mode 100644
index 0000000..1fa039f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePullable.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+public interface IOperatorNodePullable extends IOpenableDataReader<Object[]> {
+ public void setDataReader(int index, IOpenableDataWriter<Object[]> writer);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
new file mode 100644
index 0000000..62797ac
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IOperatorNodePushable {
+ public void initialize() throws HyracksDataException;
+
+ public void deinitialize() throws HyracksDataException;
+
+ public int getInputArity();
+
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+ public IFrameWriter getInputFrameWriter(int index);
+
+ public String getDisplayName();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
new file mode 100644
index 0000000..8794e09
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class OperatorDescriptorId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final int id;
+
+ public OperatorDescriptorId(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof OperatorDescriptorId)) {
+ return false;
+ }
+ return ((OperatorDescriptorId) o).id == id;
+ }
+
+ @Override
+ public String toString() {
+ return "ODID:" + id;
+ }
+
+ public static OperatorDescriptorId parse(String str) {
+ if (str.startsWith("ODID:")) {
+ str = str.substring(5);
+ return new OperatorDescriptorId(Integer.parseInt(str));
+ }
+ throw new IllegalArgumentException("Unable to parse: " + str);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorInstanceId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorInstanceId.java
new file mode 100644
index 0000000..f6ee39b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorInstanceId.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class OperatorInstanceId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private OperatorDescriptorId odId;
+ private int partition;
+
+ public OperatorInstanceId(OperatorDescriptorId odId, int partition) {
+ this.odId = odId;
+ this.partition = partition;
+ }
+
+ public OperatorDescriptorId getOperatorId() {
+ return odId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((odId == null) ? 0 : odId.hashCode());
+ result = prime * result + partition;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ OperatorInstanceId other = (OperatorInstanceId) obj;
+ if (odId == null) {
+ if (other.odId != null)
+ return false;
+ } else if (!odId.equals(other.odId))
+ return false;
+ if (partition != other.partition)
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
new file mode 100644
index 0000000..0fb44c1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class TaskAttemptId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final TaskId taskId;
+
+ private final int attempt;
+
+ public TaskAttemptId(TaskId taskId, int attempt) {
+ this.taskId = taskId;
+ this.attempt = attempt;
+ }
+
+ public TaskId getTaskId() {
+ return taskId;
+ }
+
+ public int getAttempt() {
+ return attempt;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TaskAttemptId)) {
+ return false;
+ }
+ TaskAttemptId oTaskId = (TaskAttemptId) o;
+ return oTaskId.attempt == attempt && oTaskId.taskId.equals(taskId);
+ }
+
+ @Override
+ public int hashCode() {
+ return taskId.hashCode() + attempt;
+ }
+
+ @Override
+ public String toString() {
+ return "TAID:" + taskId + ":" + attempt;
+ }
+
+ public static TaskAttemptId parse(String str) {
+ if (str.startsWith("TAID:")) {
+ str = str.substring(5);
+ int idIdx = str.lastIndexOf(':');
+ return new TaskAttemptId(TaskId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
+ }
+ throw new IllegalArgumentException("Unable to parse: " + str);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
new file mode 100644
index 0000000..7e0b22d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class TaskId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityId activityId;
+
+ private final int partition;
+
+ public TaskId(ActivityId activityId, int partition) {
+ this.activityId = activityId;
+ this.partition = partition;
+ }
+
+ public ActivityId getActivityId() {
+ return activityId;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TaskId)) {
+ return false;
+ }
+ TaskId oTaskId = (TaskId) o;
+ return oTaskId.partition == partition && oTaskId.activityId.equals(activityId);
+ }
+
+ @Override
+ public int hashCode() {
+ return activityId.hashCode() + partition;
+ }
+
+ @Override
+ public String toString() {
+ return "TID:" + activityId + ":" + partition;
+ }
+
+ public static TaskId parse(String str) {
+ if (str.startsWith("TID:")) {
+ str = str.substring(4);
+ int idIdx = str.lastIndexOf(':');
+ return new TaskId(ActivityId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
+ }
+ throw new IllegalArgumentException("Unable to parse: " + str);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
new file mode 100644
index 0000000..d6d6fee
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+import java.io.Serializable;
+
+public interface IConnectorPolicy extends Serializable {
+ public boolean requiresProducerConsumerCoscheduling();
+
+ public boolean consumerWaitsForProducerToFinish();
+
+ public boolean materializeOnSendSide();
+
+ public boolean materializeOnReceiveSide();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..ca684e4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+
+public interface IConnectorPolicyAssignmentPolicy extends Serializable {
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
new file mode 100644
index 0000000..b55947e
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class PipeliningConnectorPolicy implements IConnectorPolicy {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean requiresProducerConsumerCoscheduling() {
+ return true;
+ }
+
+ @Override
+ public boolean consumerWaitsForProducerToFinish() {
+ return false;
+ }
+
+ @Override
+ public boolean materializeOnSendSide() {
+ return false;
+ }
+
+ @Override
+ public boolean materializeOnReceiveSide() {
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
new file mode 100644
index 0000000..a5017de
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class SendSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean requiresProducerConsumerCoscheduling() {
+ return false;
+ }
+
+ @Override
+ public boolean consumerWaitsForProducerToFinish() {
+ return true;
+ }
+
+ @Override
+ public boolean materializeOnSendSide() {
+ return true;
+ }
+
+ @Override
+ public boolean materializeOnReceiveSide() {
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
new file mode 100644
index 0000000..5d522c0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class SendSideMaterializedPipeliningConnectorPolicy implements IConnectorPolicy {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean requiresProducerConsumerCoscheduling() {
+ return false;
+ }
+
+ @Override
+ public boolean consumerWaitsForProducerToFinish() {
+ return false;
+ }
+
+ @Override
+ public boolean materializeOnSendSide() {
+ return true;
+ }
+
+ @Override
+ public boolean materializeOnReceiveSide() {
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
new file mode 100644
index 0000000..82608c4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean requiresProducerConsumerCoscheduling() {
+ return false;
+ }
+
+ @Override
+ public boolean consumerWaitsForProducerToFinish() {
+ return true;
+ }
+
+ @Override
+ public boolean materializeOnSendSide() {
+ return true;
+ }
+
+ @Override
+ public boolean materializeOnReceiveSide() {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
new file mode 100644
index 0000000..1cc0583
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy implements IConnectorPolicy {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean requiresProducerConsumerCoscheduling() {
+ return false;
+ }
+
+ @Override
+ public boolean consumerWaitsForProducerToFinish() {
+ return false;
+ }
+
+ @Override
+ public boolean materializeOnSendSide() {
+ return true;
+ }
+
+ @Override
+ public boolean materializeOnReceiveSide() {
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
new file mode 100644
index 0000000..f216773
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.state;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface IStateObject {
+ public JobId getJobId();
+
+ public Object getId();
+
+ public long getMemoryOccupancy();
+
+ public void toBytes(DataOutput out) throws IOException;
+
+ public void fromBytes(DataInput in) throws IOException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparator.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparator.java
new file mode 100644
index 0000000..767684d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparator.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+public interface IBinaryComparator {
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparatorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparatorFactory.java
new file mode 100644
index 0000000..9976818
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparatorFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IBinaryComparatorFactory extends Serializable {
+ public IBinaryComparator createBinaryComparator();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunction.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunction.java
new file mode 100644
index 0000000..a995703
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunction.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+public interface IBinaryHashFunction {
+ int hash(byte[] bytes, int offset, int length);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..a064d5f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IBinaryHashFunctionFactory extends Serializable {
+ public IBinaryHashFunction createBinaryHashFunction();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFamily.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFamily.java
new file mode 100644
index 0000000..5cb27d6
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFamily.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IBinaryHashFunctionFamily extends Serializable {
+ public IBinaryHashFunction createBinaryHashFunction(int seed);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparator.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparator.java
new file mode 100644
index 0000000..2d24496
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparator.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+public interface IComparator<T> {
+ public int compare(T o1, T o2);
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparatorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparatorFactory.java
new file mode 100644
index 0000000..ef9161f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparatorFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IComparatorFactory<T> extends Serializable {
+ public IComparator<T> createComparator();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunction.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunction.java
new file mode 100644
index 0000000..9124fca
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunction.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IHashFunction<T> {
+ public int hash(T o) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunctionFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunctionFactory.java
new file mode 100644
index 0000000..b67a96c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunctionFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IHashFunctionFactory<T> extends Serializable {
+ public IHashFunction<T> createHashFunction();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputer.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputer.java
new file mode 100644
index 0000000..0a8fdcd
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputer.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+public interface INormalizedKeyComputer {
+ public int normalize(byte[] bytes, int start, int length);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..d58632c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INormalizedKeyComputerFactory extends Serializable {
+ public INormalizedKeyComputer createNormalizedKeyComputer();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
new file mode 100644
index 0000000..7552c17
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface INullWriter {
+ public void writeNull(DataOutput out) throws HyracksDataException;
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
new file mode 100644
index 0000000..6d9a744
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INullWriterFactory extends Serializable {
+ public INullWriter createNullWriter();
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
new file mode 100644
index 0000000..c480446
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+
+public interface IRecordDescriptorProvider {
+ public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex);
+
+ public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ISerializerDeserializer.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ISerializerDeserializer.java
new file mode 100644
index 0000000..33ba602
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ISerializerDeserializer.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISerializerDeserializer<T> extends Serializable {
+ /**
+ * Deserialization method.
+ *
+ * @param in
+ * - Stream to read instance from.
+ * @return A new instance of T with data.
+ */
+ public T deserialize(DataInput in) throws HyracksDataException;
+
+ /**
+ * Serialization method.
+ *
+ * @param instance
+ * - the instance to serialize.
+ * @param out
+ * - Stream to write data to.
+ */
+ public void serialize(T instance, DataOutput out) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
new file mode 100644
index 0000000..3251944
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePairComparator {
+
+ public int compare(IFrameTupleAccessor outerRef, int outerIndex, IFrameTupleAccessor innerRef, int innerIndex)
+ throws HyracksDataException;
+
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
new file mode 100644
index 0000000..297b56c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface ITuplePairComparatorFactory extends Serializable {
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputer.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputer.java
new file mode 100644
index 0000000..baf9d5c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputer.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePartitionComputer {
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
new file mode 100644
index 0000000..eb2bbe8
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITuplePartitionComputerFactory extends Serializable {
+ public ITuplePartitionComputer createPartitioner();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFamily.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFamily.java
new file mode 100644
index 0000000..1839dc1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFamily.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITuplePartitionComputerFamily extends Serializable {
+ public ITuplePartitionComputer createPartitioner(int seed);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java
new file mode 100644
index 0000000..4a6f826
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITypeTraits extends Serializable {
+ public boolean isFixedLength();
+
+ public int getFixedLength();
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
new file mode 100644
index 0000000..b04fe86
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+@SuppressWarnings("unchecked")
+public final class RecordDescriptor implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ISerializerDeserializer[] fields;
+ private final ITypeTraits[] typeTraits;
+
+ // leaving this constructor for backwards-compatibility
+ public RecordDescriptor(ISerializerDeserializer[] fields) {
+ this.fields = fields;
+ this.typeTraits = null;
+ }
+
+ // temporarily adding constructor to include type traits
+ public RecordDescriptor(ISerializerDeserializer[] fields, ITypeTraits[] typeTraits) {
+ this.fields = fields;
+ this.typeTraits = typeTraits;
+ }
+
+ public int getFieldCount() {
+ return fields.length;
+ }
+
+ public ISerializerDeserializer[] getFields() {
+ return fields;
+ }
+
+ public ITypeTraits[] getTypeTraits() {
+ return typeTraits;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
new file mode 100644
index 0000000..c6e6cd3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.exceptions;
+
+public class HyracksDataException extends HyracksException {
+ private static final long serialVersionUID = 1L;
+
+ public HyracksDataException() {
+ }
+
+ public HyracksDataException(String message) {
+ super(message);
+ }
+
+ public HyracksDataException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public HyracksDataException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
new file mode 100644
index 0000000..43913ce
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.hyracks.api.exceptions;
+
+import java.io.IOException;
+
+public class HyracksException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public HyracksException() {
+ }
+
+ public HyracksException(String message) {
+ super(message);
+ }
+
+ public HyracksException(Throwable cause) {
+ super(cause);
+ }
+
+ public HyracksException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java
new file mode 100644
index 0000000..9e27077
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import java.io.File;
+import java.io.Serializable;
+
+public final class FileReference implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final File file;
+
+ public FileReference(IODeviceHandle dev, String devRelPath) {
+ file = new File(dev.getPath(), devRelPath);
+ }
+
+ public FileReference(File file) {
+ this.file = file;
+ }
+
+ public File getFile() {
+ return file;
+ }
+
+ @Override
+ public String toString() {
+ return file.getAbsolutePath();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof FileReference)) {
+ return false;
+ }
+ return file.equals(((FileReference) o).file);
+ }
+
+ @Override
+ public int hashCode() {
+ return file.hashCode();
+ }
+
+ public void delete() {
+ file.delete();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IFileHandle.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IFileHandle.java
new file mode 100644
index 0000000..10fcef0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IFileHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+/**
+ * IFileHandle is an interface that exists only for Java compilers to perform static typing
+ * when handling file handle objects. Users must not implement this interface.
+ */
+public interface IFileHandle {
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOFuture.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOFuture.java
new file mode 100644
index 0000000..45b919f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOFuture.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIOFuture {
+ public int synchronize() throws HyracksDataException, InterruptedException;
+
+ public boolean isComplete();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
new file mode 100644
index 0000000..2e7feb5
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIOManager {
+ public enum FileReadWriteMode {
+ READ_ONLY,
+ READ_WRITE
+ }
+
+ public enum FileSyncMode {
+ METADATA_SYNC_DATA_SYNC,
+ METADATA_ASYNC_DATA_SYNC,
+ METADATA_ASYNC_DATA_ASYNC
+ }
+
+ public List<IODeviceHandle> getIODevices();
+
+ public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+ throws HyracksDataException;
+
+ public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+ public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+ public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data);
+
+ public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data);
+
+ public void close(IFileHandle fHandle) throws HyracksDataException;
+
+ public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IODeviceHandle.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IODeviceHandle.java
new file mode 100644
index 0000000..16ff19c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IODeviceHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import java.io.File;
+import java.io.Serializable;
+
+public class IODeviceHandle implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final File path;
+
+ private final String workAreaPath;
+
+ public IODeviceHandle(File path, String workAreaPath) {
+ this.path = path;
+ workAreaPath = workAreaPath.trim();
+ if (workAreaPath.endsWith(File.separator)) {
+ workAreaPath = workAreaPath.substring(0, workAreaPath.length() - 1);
+ }
+ this.workAreaPath = workAreaPath;
+ }
+
+ public File getPath() {
+ return path;
+ }
+
+ public String getWorkAreaPath() {
+ return workAreaPath;
+ }
+
+ public FileReference createFileReference(String relPath) {
+ return new FileReference(this, relPath);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWorkspaceFileFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWorkspaceFileFactory.java
new file mode 100644
index 0000000..7b9a7c4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWorkspaceFileFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IWorkspaceFileFactory {
+ public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException;
+
+ public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
new file mode 100644
index 0000000..6698ff7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public final class ActivityCluster implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityClusterGraph acg;
+
+ private final ActivityClusterId id;
+
+ private final List<IActivity> roots;
+
+ private final Map<ActivityId, IActivity> activities;
+
+ private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectors;
+
+ private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap;
+
+ private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap;
+
+ private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap;
+
+ private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
+
+ private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
+
+ private final List<ActivityCluster> dependencies;
+
+ private IConnectorPolicyAssignmentPolicy cpap;
+
+ public ActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) {
+ this.acg = acg;
+ this.id = id;
+ roots = new ArrayList<IActivity>();
+ activities = new HashMap<ActivityId, IActivity>();
+ connectors = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+ connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
+ activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+ activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+ connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+ blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+ dependencies = new ArrayList<ActivityCluster>();
+ }
+
+ public ActivityClusterGraph getActivityClusterGraph() {
+ return acg;
+ }
+
+ public ActivityClusterId getId() {
+ return id;
+ }
+
+ public void addRoot(IActivity activity) {
+ roots.add(activity);
+ }
+
+ public void addActivity(IActivity activity) {
+ activities.put(activity.getActivityId(), activity);
+ }
+
+ public void addConnector(IConnectorDescriptor connector) {
+ connectors.put(connector.getConnectorId(), connector);
+ }
+
+ public void connect(IConnectorDescriptor connector, IActivity producerActivity, int producerPort,
+ IActivity consumerActivity, int consumerPort, RecordDescriptor recordDescriptor) {
+ if (!activities.containsKey(producerActivity.getActivityId())
+ || !activities.containsKey(consumerActivity.getActivityId())) {
+ throw new IllegalStateException("Connected Activities belong to different Activity Clusters: "
+ + producerActivity.getActivityId() + " and " + consumerActivity.getActivityId());
+ }
+ insertIntoIndexedMap(activityInputMap, consumerActivity.getActivityId(), consumerPort, connector);
+ insertIntoIndexedMap(activityOutputMap, producerActivity.getActivityId(), producerPort, connector);
+ connectorActivityMap.put(
+ connector.getConnectorId(),
+ Pair.<Pair<IActivity, Integer>, Pair<IActivity, Integer>> of(
+ Pair.<IActivity, Integer> of(producerActivity, producerPort),
+ Pair.<IActivity, Integer> of(consumerActivity, consumerPort)));
+ connectorRecordDescriptorMap.put(connector.getConnectorId(), recordDescriptor);
+ }
+
+ public List<IActivity> getRoots() {
+ return roots;
+ }
+
+ public Map<ActivityId, IActivity> getActivityMap() {
+ return activities;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+ return connectors;
+ }
+
+ public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() {
+ return connectorRecordDescriptorMap;
+ }
+
+ public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() {
+ return connectorActivityMap;
+ }
+
+ public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() {
+ return activityInputMap;
+ }
+
+ public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() {
+ return activityOutputMap;
+ }
+
+ public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+ return connEdge.getRight().getLeft().getActivityId();
+ }
+
+ public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+ return connEdge.getLeft().getLeft().getActivityId();
+ }
+
+ public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
+ return blocked2blockerMap;
+ }
+
+ public List<ActivityCluster> getDependencies() {
+ return dependencies;
+ }
+
+ public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+ return cpap;
+ }
+
+ public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy cpap) {
+ this.cpap = cpap;
+ }
+
+ private <T> void extend(List<T> list, int index) {
+ int n = list.size();
+ for (int i = n; i <= index; ++i) {
+ list.add(null);
+ }
+ }
+
+ private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+ List<V> vList = map.get(key);
+ if (vList == null) {
+ vList = new ArrayList<V>();
+ map.put(key, vList);
+ }
+ extend(vList, index);
+ vList.set(index, value);
+ }
+
+ public JSONObject toJSON() throws JSONException {
+ JSONObject jac = new JSONObject();
+
+ JSONArray jans = new JSONArray();
+ for (IActivity an : activities.values()) {
+ JSONObject jan = new JSONObject();
+ jan.put("id", an.getActivityId().toString());
+ jan.put("java-class", an.getClass().getName());
+
+ List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId());
+ if (inputs != null) {
+ JSONArray jInputs = new JSONArray();
+ for (int i = 0; i < inputs.size(); ++i) {
+ JSONObject jInput = new JSONObject();
+ jInput.put("input-port", i);
+ jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
+ jInputs.put(jInput);
+ }
+ jan.put("inputs", jInputs);
+ }
+
+ List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId());
+ if (outputs != null) {
+ JSONArray jOutputs = new JSONArray();
+ for (int i = 0; i < outputs.size(); ++i) {
+ JSONObject jOutput = new JSONObject();
+ jOutput.put("output-port", i);
+ jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
+ jOutputs.put(jOutput);
+ }
+ jan.put("outputs", jOutputs);
+ }
+
+ Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
+ if (blockers != null) {
+ JSONArray jDeps = new JSONArray();
+ for (ActivityId blocker : blockers) {
+ jDeps.put(blocker.toString());
+ }
+ jan.put("depends-on", jDeps);
+ }
+ jans.put(jan);
+ }
+ jac.put("activities", jans);
+
+ return jac;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
new file mode 100644
index 0000000..32c93cd
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -0,0 +1,149 @@
+/*
+2 * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+
+public class ActivityClusterGraph implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private int version;
+
+ private final Map<ActivityClusterId, ActivityCluster> activityClusterMap;
+
+ private final Map<ActivityId, ActivityCluster> activityMap;
+
+ private final Map<ConnectorDescriptorId, ActivityCluster> connectorMap;
+
+ private int frameSize;
+
+ private int maxReattempts;
+
+ private IJobletEventListenerFactory jobletEventListenerFactory;
+
+ private IGlobalJobDataFactory globalJobDataFactory;
+
+ private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+
+ private boolean useConnectorPolicyForScheduling;
+
+ public ActivityClusterGraph() {
+ version = 0;
+ activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
+ activityMap = new HashMap<ActivityId, ActivityCluster>();
+ connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
+ frameSize = 32768;
+ }
+
+ public Map<ActivityId, ActivityCluster> getActivityMap() {
+ return activityMap;
+ }
+
+ public Map<ConnectorDescriptorId, ActivityCluster> getConnectorMap() {
+ return connectorMap;
+ }
+
+ public Map<ActivityClusterId, ActivityCluster> getActivityClusterMap() {
+ return activityClusterMap;
+ }
+
+ public void addActivityClusters(Collection<ActivityCluster> newActivityClusters) {
+ for (ActivityCluster ac : newActivityClusters) {
+ activityClusterMap.put(ac.getId(), ac);
+ for (ActivityId aid : ac.getActivityMap().keySet()) {
+ activityMap.put(aid, ac);
+ }
+ for (ConnectorDescriptorId cid : ac.getConnectorMap().keySet()) {
+ connectorMap.put(cid, ac);
+ }
+ }
+ ++version;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setFrameSize(int frameSize) {
+ this.frameSize = frameSize;
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ public void setMaxReattempts(int maxReattempts) {
+ this.maxReattempts = maxReattempts;
+ }
+
+ public int getMaxReattempts() {
+ return maxReattempts;
+ }
+
+ public IJobletEventListenerFactory getJobletEventListenerFactory() {
+ return jobletEventListenerFactory;
+ }
+
+ public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+ this.jobletEventListenerFactory = jobletEventListenerFactory;
+ }
+
+ public IGlobalJobDataFactory getGlobalJobDataFactory() {
+ return globalJobDataFactory;
+ }
+
+ public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
+ this.globalJobDataFactory = globalJobDataFactory;
+ }
+
+ public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+ return connectorPolicyAssignmentPolicy;
+ }
+
+ public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+ this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+ }
+
+ public boolean isUseConnectorPolicyForScheduling() {
+ return useConnectorPolicyForScheduling;
+ }
+
+ public void setUseConnectorPolicyForScheduling(boolean useConnectorPolicyForScheduling) {
+ this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
+ }
+
+ public JSONObject toJSON() throws JSONException {
+ JSONObject acgj = new JSONObject();
+
+ JSONArray acl = new JSONArray();
+ for (ActivityCluster ac : activityClusterMap.values()) {
+ acl.put(ac.toJSON());
+ }
+ acgj.put("version", version);
+ acgj.put("activity-clusters", acl);
+ return acgj;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
new file mode 100644
index 0000000..cba63959
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+public final class ActivityClusterId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final int id;
+
+ public ActivityClusterId(JobId jobId, int id) {
+ this.jobId = jobId;
+ this.id = id;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + id;
+ result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ ActivityClusterId other = (ActivityClusterId) obj;
+ if (id != other.id) {
+ return false;
+ }
+ if (jobId == null) {
+ if (other.jobId != null) {
+ return false;
+ }
+ } else if (!jobId.equals(other.jobId)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "ACID:" + jobId + ":" + id;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
new file mode 100644
index 0000000..b837066
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+
+public interface IActivityClusterGraphGenerator {
+ public Set<Constraint> getConstraints();
+
+ public ActivityClusterGraph initialize();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
new file mode 100644
index 0000000..d801dd1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IActivityClusterGraphGeneratorFactory extends Serializable {
+ public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(String appName, JobId jobId,
+ ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IConnectorDescriptorRegistry.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IConnectorDescriptorRegistry.java
new file mode 100644
index 0000000..65b27e3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IConnectorDescriptorRegistry.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+
+public interface IConnectorDescriptorRegistry {
+ public ConnectorDescriptorId createConnectorDescriptor(IConnectorDescriptor conn);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IGlobalJobDataFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IGlobalJobDataFactory.java
new file mode 100644
index 0000000..bee495a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IGlobalJobDataFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+
+public interface IGlobalJobDataFactory extends Serializable {
+ public Object createGlobalJobData(IHyracksJobletContext ctx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
new file mode 100644
index 0000000..2f0f025
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IJobLifecycleListener {
+ public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException;
+
+ public void notifyJobStart(JobId jobId) throws HyracksException;
+
+ public void notifyJobFinish(JobId jobId) throws HyracksException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
new file mode 100644
index 0000000..c83e333
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public interface IJobletEventListener {
+ public void jobletStart();
+
+ public void jobletFinish(JobStatus status);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
new file mode 100644
index 0000000..3b3cacb
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+
+public interface IJobletEventListenerFactory extends Serializable {
+ public IJobletEventListener createListener(IHyracksJobletContext ctx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorDescriptorRegistry.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorDescriptorRegistry.java
new file mode 100644
index 0000000..4ea84d6
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorDescriptorRegistry.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public interface IOperatorDescriptorRegistry {
+ public OperatorDescriptorId createOperatorDescriptorId(IOperatorDescriptor op);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
new file mode 100644
index 0000000..eaa1ec2
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+
+public interface IOperatorEnvironment {
+ public void setStateObject(IStateObject taskState);
+
+ public IStateObject getStateObject(Object id);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
new file mode 100644
index 0000000..5b62b5c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class JobActivityGraph implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Map<ActivityId, IActivity> activityMap;
+
+ private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectorMap;
+
+ private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap;
+
+ private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap;
+
+ private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap;
+
+ private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
+
+ private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
+
+ public JobActivityGraph() {
+ activityMap = new HashMap<ActivityId, IActivity>();
+ connectorMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+ connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
+ activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+ activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+ connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+ blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+ }
+
+ public Map<ActivityId, IActivity> getActivityMap() {
+ return activityMap;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+ return connectorMap;
+ }
+
+ public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() {
+ return connectorRecordDescriptorMap;
+ }
+
+ public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
+ return blocked2blockerMap;
+ }
+
+ public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() {
+ return activityInputMap;
+ }
+
+ public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() {
+ return activityOutputMap;
+ }
+
+ public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() {
+ return connectorActivityMap;
+ }
+
+ public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+ return connEdge.getRight().getLeft().getActivityId();
+ }
+
+ public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+ return connEdge.getLeft().getLeft().getActivityId();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("ActivityNodes: " + activityMap);
+ buffer.append('\n');
+ buffer.append("Blocker->Blocked: " + blocked2blockerMap);
+ buffer.append('\n');
+ return buffer.toString();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
new file mode 100644
index 0000000..2b9787d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public enum JobFlag {
+ PROFILE_RUNTIME
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
new file mode 100644
index 0000000..ffb9bba
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+public final class JobId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final long id;
+
+ public JobId(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof JobId)) {
+ return false;
+ }
+ return ((JobId) o).id == id;
+ }
+
+ @Override
+ public String toString() {
+ return "JID:" + id;
+ }
+
+ public static JobId parse(String str) {
+ if (str.startsWith("JID:")) {
+ str = str.substring(4);
+ return new JobId(Long.parseLong(str));
+ }
+ throw new IllegalArgumentException();
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
new file mode 100644
index 0000000..7c523f1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
+ private static final long serialVersionUID = 1L;
+
+ private final List<OperatorDescriptorId> roots;
+
+ private final Map<OperatorDescriptorId, IOperatorDescriptor> opMap;
+
+ private final Map<ConnectorDescriptorId, IConnectorDescriptor> connMap;
+
+ private final Map<OperatorDescriptorId, List<IConnectorDescriptor>> opInputMap;
+
+ private final Map<OperatorDescriptorId, List<IConnectorDescriptor>> opOutputMap;
+
+ private final Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap;
+
+ private final Map<String, Serializable> properties;
+
+ private final Set<Constraint> userConstraints;
+
+ private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+
+ private int frameSize;
+
+ private int maxReattempts;
+
+ private IJobletEventListenerFactory jobletEventListenerFactory;
+
+ private IGlobalJobDataFactory globalJobDataFactory;
+
+ private boolean useConnectorPolicyForScheduling;
+
+ private transient int operatorIdCounter;
+
+ private transient int connectorIdCounter;
+
+ public JobSpecification() {
+ roots = new ArrayList<OperatorDescriptorId>();
+ opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
+ connMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+ opInputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
+ opOutputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
+ connectorOpMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>>();
+ properties = new HashMap<String, Serializable>();
+ userConstraints = new HashSet<Constraint>();
+ operatorIdCounter = 0;
+ connectorIdCounter = 0;
+ frameSize = 32768;
+ maxReattempts = 2;
+ useConnectorPolicyForScheduling = true;
+ }
+
+ @Override
+ public OperatorDescriptorId createOperatorDescriptorId(IOperatorDescriptor op) {
+ OperatorDescriptorId odId = new OperatorDescriptorId(operatorIdCounter++);
+ opMap.put(odId, op);
+ return odId;
+ }
+
+ @Override
+ public ConnectorDescriptorId createConnectorDescriptor(IConnectorDescriptor conn) {
+ ConnectorDescriptorId cdId = new ConnectorDescriptorId(connectorIdCounter++);
+ connMap.put(cdId, conn);
+ return cdId;
+ }
+
+ public void addRoot(IOperatorDescriptor op) {
+ roots.add(op.getOperatorId());
+ }
+
+ public void connect(IConnectorDescriptor conn, IOperatorDescriptor producerOp, int producerPort,
+ IOperatorDescriptor consumerOp, int consumerPort) {
+ insertIntoIndexedMap(opInputMap, consumerOp.getOperatorId(), consumerPort, conn);
+ insertIntoIndexedMap(opOutputMap, producerOp.getOperatorId(), producerPort, conn);
+ connectorOpMap.put(
+ conn.getConnectorId(),
+ Pair.<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> of(
+ Pair.<IOperatorDescriptor, Integer> of(producerOp, producerPort),
+ Pair.<IOperatorDescriptor, Integer> of(consumerOp, consumerPort)));
+ }
+
+ public void setProperty(String name, Serializable value) {
+ properties.put(name, value);
+ }
+
+ public Serializable getProperty(String name) {
+ return properties.get(name);
+ }
+
+ private <T> void extend(List<T> list, int index) {
+ int n = list.size();
+ for (int i = n; i <= index; ++i) {
+ list.add(null);
+ }
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+ return connMap;
+ }
+
+ public Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> getConnectorOperatorMap() {
+ return connectorOpMap;
+ }
+
+ public RecordDescriptor getConnectorRecordDescriptor(IConnectorDescriptor conn) {
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+ .getConnectorId());
+ return connInfo.getLeft().getLeft().getOutputRecordDescriptors()[connInfo.getLeft().getRight()];
+ }
+
+ public IOperatorDescriptor getConsumer(IConnectorDescriptor conn) {
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+ .getConnectorId());
+ return connInfo.getRight().getLeft();
+ }
+
+ public int getConsumerInputIndex(IConnectorDescriptor conn) {
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+ .getConnectorId());
+ return connInfo.getRight().getRight();
+ }
+
+ public IConnectorDescriptor getInputConnectorDescriptor(IOperatorDescriptor op, int inputIndex) {
+ return getInputConnectorDescriptor(op.getOperatorId(), inputIndex);
+ }
+
+ public IConnectorDescriptor getInputConnectorDescriptor(OperatorDescriptorId odId, int inputIndex) {
+ return opInputMap.get(odId).get(inputIndex);
+ }
+
+ public Map<OperatorDescriptorId, List<IConnectorDescriptor>> getOperatorInputMap() {
+ return opInputMap;
+ }
+
+ public RecordDescriptor getOperatorInputRecordDescriptor(OperatorDescriptorId odId, int inputIndex) {
+ return getConnectorRecordDescriptor(getInputConnectorDescriptor(odId, inputIndex));
+ }
+
+ public Map<OperatorDescriptorId, IOperatorDescriptor> getOperatorMap() {
+ return opMap;
+ }
+
+ public Map<OperatorDescriptorId, List<IConnectorDescriptor>> getOperatorOutputMap() {
+ return opOutputMap;
+ }
+
+ public RecordDescriptor getOperatorOutputRecordDescriptor(OperatorDescriptorId odId, int outputIndex) {
+ return getConnectorRecordDescriptor(getOutputConnectorDescriptor(odId, outputIndex));
+ }
+
+ public IConnectorDescriptor getOutputConnectorDescriptor(IOperatorDescriptor op, int outputIndex) {
+ return getOutputConnectorDescriptor(op.getOperatorId(), outputIndex);
+ }
+
+ public IConnectorDescriptor getOutputConnectorDescriptor(OperatorDescriptorId odId, int outputIndex) {
+ return opOutputMap.get(odId).get(outputIndex);
+ }
+
+ public IOperatorDescriptor getProducer(IConnectorDescriptor conn) {
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+ .getConnectorId());
+ return connInfo.getLeft().getLeft();
+ }
+
+ public int getProducerOutputIndex(IConnectorDescriptor conn) {
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+ .getConnectorId());
+ return connInfo.getLeft().getRight();
+ }
+
+ public List<OperatorDescriptorId> getRoots() {
+ return roots;
+ }
+
+ public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+ return connectorPolicyAssignmentPolicy;
+ }
+
+ public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+ this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+ }
+
+ public void setFrameSize(int frameSize) {
+ this.frameSize = frameSize;
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ public void setMaxReattempts(int maxReattempts) {
+ this.maxReattempts = maxReattempts;
+ }
+
+ public int getMaxReattempts() {
+ return maxReattempts;
+ }
+
+ public void addUserConstraint(Constraint constraint) {
+ userConstraints.add(constraint);
+ }
+
+ public Set<Constraint> getUserConstraints() {
+ return userConstraints;
+ }
+
+ public IJobletEventListenerFactory getJobletEventListenerFactory() {
+ return jobletEventListenerFactory;
+ }
+
+ public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+ this.jobletEventListenerFactory = jobletEventListenerFactory;
+ }
+
+ public IGlobalJobDataFactory getGlobalJobDataFactory() {
+ return globalJobDataFactory;
+ }
+
+ public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
+ this.globalJobDataFactory = globalJobDataFactory;
+ }
+
+ public boolean isUseConnectorPolicyForScheduling() {
+ return useConnectorPolicyForScheduling;
+ }
+
+ public void setUseConnectorPolicyForScheduling(boolean useConnectorPolicyForScheduling) {
+ this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
+ }
+
+ private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+ List<V> vList = map.get(key);
+ if (vList == null) {
+ vList = new ArrayList<V>();
+ map.put(key, vList);
+ }
+ extend(vList, index);
+ vList.set(index, value);
+ }
+
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+
+ for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : opMap.entrySet()) {
+ buffer.append(e.getKey().getId()).append(" : ").append(e.getValue().toString()).append("\n");
+ List<IConnectorDescriptor> inputs = opInputMap.get(e.getKey());
+ if (inputs != null && !inputs.isEmpty()) {
+ buffer.append(" Inputs:\n");
+ for (IConnectorDescriptor c : inputs) {
+ buffer.append(" ").append(c.getConnectorId().getId()).append(" : ").append(c.toString())
+ .append("\n");
+ }
+ }
+ List<IConnectorDescriptor> outputs = opOutputMap.get(e.getKey());
+ if (outputs != null && !outputs.isEmpty()) {
+ buffer.append(" Outputs:\n");
+ for (IConnectorDescriptor c : outputs) {
+ buffer.append(" ").append(c.getConnectorId().getId()).append(" : ").append(c.toString())
+ .append("\n");
+ }
+ }
+ }
+
+ buffer.append("\n").append("Constraints:\n").append(userConstraints);
+
+ return buffer.toString();
+ }
+
+ public JSONObject toJSON() throws JSONException {
+ JSONObject jjob = new JSONObject();
+
+ JSONArray jopArray = new JSONArray();
+ for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : opMap.entrySet()) {
+ jopArray.put(e.getValue().toJSON());
+ }
+ jjob.put("operators", jopArray);
+
+ JSONArray jcArray = new JSONArray();
+ for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : connMap.entrySet()) {
+ JSONObject conn = new JSONObject();
+ Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connection = connectorOpMap
+ .get(e.getKey());
+ if (connection != null) {
+ conn.put("in-operator-id", connection.getLeft().getLeft().getOperatorId().toString());
+ conn.put("in-operator-port", connection.getLeft().getRight().intValue());
+ conn.put("out-operator-id", connection.getRight().getLeft().getOperatorId().toString());
+ conn.put("out-operator-port", connection.getRight().getRight().intValue());
+ }
+ conn.put("connector", e.getValue().toJSON());
+ jcArray.put(conn);
+ }
+ jjob.put("connectors", jcArray);
+
+ return jjob;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobStatus.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobStatus.java
new file mode 100644
index 0000000..aa6fb68
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobStatus.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public enum JobStatus {
+ INITIALIZED,
+ RUNNING,
+ TERMINATED,
+ FAILURE,
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
new file mode 100644
index 0000000..1ca0ac7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job.profiling.counters;
+
+public interface ICounter {
+ /**
+ * Get the fully-qualified name of the counter.
+ *
+ * @return Name of the counter.
+ */
+ public String getName();
+
+ /**
+ * Update the value of the counter to be current + delta.
+ *
+ * @param delta
+ * - Amount to change the counter value by.
+ * @return the new value after update.
+ */
+ public long update(long delta);
+
+ /**
+ * Set the value of the counter.
+ *
+ * @param value
+ * - New value of the counter.
+ * @return Old value of the counter.
+ */
+ public long set(long value);
+
+ /**
+ * Get the value of the counter.
+ *
+ * @return the value of the counter.
+ */
+ public long get();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java
new file mode 100644
index 0000000..4be02d7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job.profiling.counters;
+
+/**
+ * A namespace that holds named counters.
+ *
+ * @author vinayakb
+ */
+public interface ICounterContext {
+ /**
+ * Get a counter with the specified name.
+ *
+ * @param name
+ * - Name of the counter to get.
+ * @param create
+ * - Create if the counter does not exist.
+ * @return An existing counter with the given name (if one exists). If a counter with the
+ * said name does not exist, a new one is created if create is set to <code>true</code>, or
+ * else returns <code>null</code>.
+ */
+ public ICounter getCounter(String name, boolean create);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
new file mode 100644
index 0000000..96ad7fe
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.messages;
+
+import java.io.Serializable;
+
+/**
+ * @author rico
+ *
+ */
+public interface IMessage extends Serializable {
+
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
new file mode 100644
index 0000000..35ae813
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.messages;
+
+/**
+ * @author rico
+ *
+ */
+public interface IMessageBroker {
+
+ public void receivedMessage(IMessage message, String nodeId);
+
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
new file mode 100644
index 0000000..c3dfad0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.partitions;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+
+public interface IPartition extends IDeallocatable {
+ public IHyracksTaskContext getTaskContext();
+
+ public void writeTo(IFrameWriter writer);
+
+ public boolean isReusable();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
new file mode 100644
index 0000000..be25d12
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.partitions;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public final class PartitionId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ private final ConnectorDescriptorId cdId;
+
+ private final int senderIndex;
+
+ private final int receiverIndex;
+
+ public PartitionId(JobId jobId, ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
+ this.jobId = jobId;
+ this.cdId = cdId;
+ this.senderIndex = senderIndex;
+ this.receiverIndex = receiverIndex;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+
+ public ConnectorDescriptorId getConnectorDescriptorId() {
+ return cdId;
+ }
+
+ public int getSenderIndex() {
+ return senderIndex;
+ }
+
+ public int getReceiverIndex() {
+ return receiverIndex;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((cdId == null) ? 0 : cdId.hashCode());
+ result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
+ result = prime * result + receiverIndex;
+ result = prime * result + senderIndex;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PartitionId other = (PartitionId) obj;
+ if (cdId == null) {
+ if (other.cdId != null)
+ return false;
+ } else if (!cdId.equals(other.cdId))
+ return false;
+ if (jobId == null) {
+ if (other.jobId != null)
+ return false;
+ } else if (!jobId.equals(other.jobId))
+ return false;
+ if (receiverIndex != other.receiverIndex)
+ return false;
+ if (senderIndex != other.senderIndex)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return jobId.toString() + ":" + cdId + ":" + senderIndex + ":" + receiverIndex;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatable.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatable.java
new file mode 100644
index 0000000..e5b3ed8
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatable.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface IDeallocatable {
+ public void deallocate();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatableRegistry.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatableRegistry.java
new file mode 100644
index 0000000..e130607
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatableRegistry.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface IDeallocatableRegistry {
+ public void registerDeallocatable(IDeallocatable deallocatable);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java
new file mode 100644
index 0000000..6f566d1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class ClusterTopology implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final NetworkSwitch clusterSwitch;
+
+ public ClusterTopology(NetworkSwitch clusterSwitch) {
+ this.clusterSwitch = clusterSwitch;
+ }
+
+ public NetworkSwitch getClusterSwitch() {
+ return clusterSwitch;
+ }
+
+ public boolean lookupNetworkTerminal(String terminalName, List<Integer> path) {
+ return clusterSwitch.lookupNetworkTerminal(terminalName, path);
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java
new file mode 100644
index 0000000..0aaee2a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class NetworkEndpoint implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public enum EndpointType {
+ NETWORK_SWITCH,
+ NETWORK_TERMINAL,
+ }
+
+ protected final String name;
+
+ protected final Map<String, String> properties;
+
+ public NetworkEndpoint(String name, Map<String, String> properties) {
+ this.name = name;
+ this.properties = properties;
+ }
+
+ public abstract EndpointType getType();
+
+ public final String getName() {
+ return name;
+ }
+
+ public final Map<String, String> getProperties() {
+ return properties;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java
new file mode 100644
index 0000000..3c89044
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NetworkSwitch extends NetworkEndpoint {
+ private static final long serialVersionUID = 1L;
+
+ private final Port[] ports;
+
+ private final Map<String, Integer> terminalNamePortIndexMap;
+
+ public NetworkSwitch(String name, Map<String, String> properties, Port[] ports) {
+ super(name, properties);
+ this.ports = ports;
+ terminalNamePortIndexMap = new HashMap<String, Integer>();
+ for (int i = 0; i < ports.length; ++i) {
+ Port port = ports[i];
+ NetworkEndpoint endpoint = port.getEndpoint();
+ Integer portIndex = Integer.valueOf(i);
+ switch (endpoint.getType()) {
+ case NETWORK_SWITCH: {
+ NetworkSwitch s = (NetworkSwitch) endpoint;
+ for (String t : s.terminalNamePortIndexMap.keySet()) {
+ terminalNamePortIndexMap.put(t, portIndex);
+ }
+ break;
+ }
+
+ case NETWORK_TERMINAL: {
+ NetworkTerminal t = (NetworkTerminal) endpoint;
+ terminalNamePortIndexMap.put(t.getName(), portIndex);
+ break;
+ }
+ }
+ }
+ }
+
+ public Port[] getPorts() {
+ return ports;
+ }
+
+ @Override
+ public EndpointType getType() {
+ return EndpointType.NETWORK_SWITCH;
+ }
+
+ boolean lookupNetworkTerminal(String terminalName, List<Integer> path) {
+ if (terminalNamePortIndexMap.containsKey(terminalName)) {
+ Integer portIndex = terminalNamePortIndexMap.get(terminalName);
+ path.add(portIndex);
+ NetworkEndpoint endpoint = ports[portIndex.intValue()].getEndpoint();
+ if (endpoint.getType() == EndpointType.NETWORK_SWITCH) {
+ ((NetworkSwitch) endpoint).lookupNetworkTerminal(terminalName, path);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ void getPortList(List<Integer> path, int stepIndex, List<Port> portList) {
+ if (stepIndex >= path.size()) {
+ return;
+ }
+ int portIndex = path.get(stepIndex);
+ Port port = ports[portIndex];
+ portList.add(port);
+ ++stepIndex;
+ if (stepIndex >= path.size()) {
+ return;
+ }
+ NetworkEndpoint endpoint = port.getEndpoint();
+ if (endpoint.getType() != EndpointType.NETWORK_SWITCH) {
+ throw new IllegalArgumentException("Path provided, " + path + ", longer than depth of topology tree");
+ }
+ ((NetworkSwitch) endpoint).getPortList(path, stepIndex, portList);
+ }
+
+ public static class Port implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final NetworkEndpoint endpoint;
+
+ public Port(NetworkEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public NetworkEndpoint getEndpoint() {
+ return endpoint;
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
new file mode 100644
index 0000000..f36da76
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.util.Map;
+
+public class NetworkTerminal extends NetworkEndpoint {
+ private static final long serialVersionUID = 1L;
+
+ public NetworkTerminal(String name, Map<String, String> properties) {
+ super(name, properties);
+ }
+
+ @Override
+ public EndpointType getType() {
+ return EndpointType.NETWORK_TERMINAL;
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java
new file mode 100644
index 0000000..0ca6127
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.xml.sax.Attributes;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+import edu.uci.ics.hyracks.api.topology.NetworkEndpoint.EndpointType;
+
+public class TopologyDefinitionParser {
+ private final Stack<ElementStackEntry> stack;
+
+ private boolean inPropertyElement;
+
+ private TopologyDefinitionParser() {
+ stack = new Stack<ElementStackEntry>();
+ inPropertyElement = false;
+ }
+
+ public static ClusterTopology parse(InputSource in) throws IOException, SAXException {
+ TopologyDefinitionParser parser = new TopologyDefinitionParser();
+ return parser.parseInternal(in);
+ }
+
+ private ClusterTopology parseInternal(InputSource in) throws IOException, SAXException {
+ XMLReader parser;
+ parser = XMLReaderFactory.createXMLReader();
+ SAXContentHandler handler = new SAXContentHandler();
+ parser.setContentHandler(handler);
+ parser.parse(in);
+ if (stack.size() != 1) {
+ throw new IllegalStateException("Malformed topology definition");
+ }
+ ElementStackEntry e = stack.pop();
+ if (e.ports.size() != 1) {
+ throw new IllegalArgumentException("Malformed topology definition");
+ }
+ NetworkEndpoint endpoint = e.ports.get(0).getEndpoint();
+ if (endpoint.getType() != EndpointType.NETWORK_SWITCH) {
+ throw new IllegalArgumentException("Top level content in cluster-topology must be network-switch");
+ }
+ return new ClusterTopology((NetworkSwitch) endpoint);
+ }
+
+ private class SAXContentHandler extends DefaultHandler {
+ @Override
+ public void endElement(String uri, String localName, String qName) throws SAXException {
+ if ("network-switch".equals(localName) || "terminal".equals(localName)) {
+ ElementStackEntry e = stack.pop();
+ NetworkEndpoint endpoint = e.type == EndpointType.NETWORK_SWITCH ? new NetworkSwitch(e.name,
+ e.properties, e.ports.toArray(new NetworkSwitch.Port[e.ports.size()])) : new NetworkTerminal(
+ e.name, e.properties);
+ stack.peek().ports.add(new NetworkSwitch.Port(endpoint));
+ } else if ("property".equals(localName)) {
+ if (!inPropertyElement) {
+ throw new IllegalStateException("Improperly nested property element encountered");
+ }
+ inPropertyElement = false;
+ }
+ }
+
+ @Override
+ public void startElement(String uri, String localName, String qName, Attributes atts) throws SAXException {
+ if ("cluster-topology".equals(localName)) {
+ if (!stack.isEmpty()) {
+ throw new IllegalStateException("Encountered unexpected " + qName);
+ }
+ stack.push(new ElementStackEntry(null, ""));
+ } else if ("network-switch".equals(localName) || "terminal".equals(localName)) {
+ String name = atts.getValue("", "name");
+ if (name == null) {
+ throw new IllegalStateException("Encountered " + localName + " element with no name attribute");
+ }
+ EndpointType type = "network-switch".equals(localName) ? EndpointType.NETWORK_SWITCH
+ : EndpointType.NETWORK_TERMINAL;
+ ElementStackEntry e = new ElementStackEntry(type, name);
+ stack.push(e);
+ } else if ("property".equals(localName)) {
+ if (inPropertyElement) {
+ throw new IllegalStateException("Improperly nested property element encountered");
+ }
+ String name = atts.getValue("", "name");
+ if (name == null) {
+ throw new IllegalStateException("Encountered " + localName + " element with no name attribute");
+ }
+ String value = atts.getValue("", "value");
+ if (value == null) {
+ throw new IllegalStateException("Encountered " + localName + " element with no value attribute");
+ }
+ stack.peek().properties.put(name, value);
+ inPropertyElement = true;
+ }
+ }
+ }
+
+ private static class ElementStackEntry {
+ private final EndpointType type;
+
+ private final String name;
+
+ private final Map<String, String> properties;
+
+ private final List<NetworkSwitch.Port> ports;
+
+ public ElementStackEntry(EndpointType type, String name) {
+ this.type = type;
+ this.name = name;
+ this.properties = new HashMap<String, String>();
+ ports = new ArrayList<NetworkSwitch.Port>();
+ }
+ }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
new file mode 100644
index 0000000..1185b5e
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.Serializable;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Proxy;
+
+public class JavaSerializationUtils {
+ public static byte[] serialize(Serializable jobSpec) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(jobSpec);
+ return baos.toByteArray();
+ }
+
+ public static Object deserialize(byte[] bytes, ClassLoader classLoader) throws IOException, ClassNotFoundException {
+ if (bytes == null) {
+ return null;
+ }
+ ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
+ return ois.readObject();
+ } finally {
+ Thread.currentThread().setContextClassLoader(ctxCL);
+ }
+ }
+
+ private static class ClassLoaderObjectInputStream extends ObjectInputStream {
+ private ClassLoader classLoader;
+
+ protected ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException,
+ SecurityException {
+ super(in);
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws ClassNotFoundException {
+ return Class.forName(desc.getName(), false, classLoader);
+ }
+
+ @Override
+ protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+ ClassLoader nonPublicLoader = null;
+ boolean hasNonPublicInterface = false;
+
+ // define proxy in class loader of non-public interface(s), if any
+ Class[] classObjs = new Class[interfaces.length];
+ for (int i = 0; i < interfaces.length; i++) {
+ Class cl = Class.forName(interfaces[i], false, classLoader);
+ if ((cl.getModifiers() & Modifier.PUBLIC) == 0) {
+ if (hasNonPublicInterface) {
+ if (nonPublicLoader != cl.getClassLoader()) {
+ throw new IllegalAccessError("conflicting non-public interface class loaders");
+ }
+ } else {
+ nonPublicLoader = cl.getClassLoader();
+ hasNonPublicInterface = true;
+ }
+ }
+ classObjs[i] = cl;
+ }
+ try {
+ return Proxy.getProxyClass(hasNonPublicInterface ? nonPublicLoader : classLoader, classObjs);
+ } catch (IllegalArgumentException e) {
+ throw new ClassNotFoundException(null, e);
+ }
+ }
+ }
+}
\ No newline at end of file