Merged fullstack_staging branch into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@2372 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/hyracks/hyracks-api/pom.xml b/fullstack/hyracks/hyracks-api/pom.xml
new file mode 100644
index 0000000..7a9f7ca
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/pom.xml
@@ -0,0 +1,57 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>hyracks-api</artifactId>
+  <name>hyracks-api</name>
+  <parent>
+    <groupId>edu.uci.ics.hyracks</groupId>
+    <artifactId>hyracks</artifactId>
+    <version>0.2.2-SNAPSHOT</version>
+  </parent>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.0.2</version>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+  	<dependency>
+  		<groupId>org.json</groupId>
+  		<artifactId>json</artifactId>
+  		<version>20090211</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.httpcomponents</groupId>
+  		<artifactId>httpclient</artifactId>
+  		<version>4.1-alpha2</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>args4j</groupId>
+  		<artifactId>args4j</artifactId>
+  		<version>2.0.12</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
+  	<dependency>
+  		<groupId>edu.uci.ics.hyracks</groupId>
+  		<artifactId>hyracks-ipc</artifactId>
+  		<version>0.2.2-SNAPSHOT</version>
+  	</dependency>
+  	<dependency>
+  		<groupId>org.apache.commons</groupId>
+  		<artifactId>commons-lang3</artifactId>
+  		<version>3.1</version>
+  	</dependency>
+  </dependencies>
+</project>
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
new file mode 100644
index 0000000..81cf511
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IApplicationContext.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.messages.IMessageBroker;
+
+/**
+ * Base class of the {@link ICCApplicationContext} and the
+ * {@link INCApplicationContext}.
+ * 
+ * @author vinayakb
+ * 
+ */
+public interface IApplicationContext {
+    /**
+     * Provides the Class Loader that loads classes for this Hyracks Application
+     * at the CC.
+     * 
+     * @return the application {@link ClassLoader}.
+     */
+    public ClassLoader getClassLoader();
+
+    /**
+     * Gets the distributed state that is made available to all the Application
+     * Contexts of this application in the cluster.
+     * 
+     * @return
+     */
+    public Serializable getDistributedState();
+
+    public void setMessageBroker(IMessageBroker messageBroker);
+
+    public IMessageBroker getMessageBroker();
+
+    public String getApplicationName();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
new file mode 100644
index 0000000..19f97b4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IBootstrap.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+/**
+ * Base class of {@link ICCBootstrap} and {@link INCBootstrap}.
+ * 
+ * @author vinayakb
+ */
+public interface IBootstrap {
+    /**
+     * Method called to start the application at a Hyracks CC or NC node.
+     * 
+     * @throws Exception
+     */
+    public void start() throws Exception;
+
+    /**
+     * Method called to shutdown the application at a Hyracks CC or NC node.
+     * 
+     * @throws Exception
+     */
+    public void stop() throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
new file mode 100644
index 0000000..2792d29
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.ICCContext;
+import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
+
+/**
+ * Application Context at the Cluster Controller for an application.
+ * 
+ * @author vinayakb
+ */
+public interface ICCApplicationContext extends IApplicationContext {
+    /**
+     * Sets the state that must be distributed by the infrastructure to all the
+     * NC application contects. Any state set by calling thsi method in the {@link ICCBootstrap#start()} call is made available to all the {@link INCApplicationContext} objects at each Node Controller. The state
+     * is then available to be inspected by the application at the NC during or
+     * after the {@link INCBootstrap#start()} call.
+     * 
+     * @param state
+     *            The distributed state
+     */
+    public void setDistributedState(Serializable state);
+
+    /**
+     * A listener that listens to Job Lifecycle events at the Cluster
+     * Controller.
+     * 
+     * @param jobLifecycleListener
+     */
+    public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+
+    /**
+     * Get the Cluster Controller Context.
+     * 
+     * @return The Cluster Controller Context.
+     */
+    public ICCContext getCCContext();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java
new file mode 100644
index 0000000..e3906ea
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCBootstrap.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.api.application;
+
+/**
+ * Implemented by the bootstrap class of the application that will manage its
+ * life cycle at the Cluster Controller.
+ * 
+ * @author vinayakb
+ * 
+ */
+public interface ICCBootstrap extends IBootstrap {
+    /**
+     * Called by the infrastructure to set the CC Application Context for the
+     * application. The infrastructure makes this call prior to calling start().
+     * 
+     * @param appCtx
+     *            - The CC application context
+     */
+    public void setApplicationContext(ICCApplicationContext appCtx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
new file mode 100644
index 0000000..24a598b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.application;
+
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+
+/**
+ * Application Context at the Node Controller for an application.
+ * 
+ * @author vinayakb
+ * 
+ */
+public interface INCApplicationContext extends IApplicationContext {
+    /**
+     * Gets the node Id of the Node Congtroller.
+     * 
+     * @return the Node Id.
+     */
+    public String getNodeId();
+
+    /**
+     * Get the Hyracks Root Context.
+     * 
+     * @return The Hyracks Root Context
+     */
+    public IHyracksRootContext getRootContext();
+
+    /**
+     * Set an object that can be later retrieved by the
+     * {@link #getApplicationObject()} call.
+     * 
+     * @param object
+     *            Application Object
+     */
+    public void setApplicationObject(Object object);
+
+    /**
+     * Get the application object previously set by the
+     * {@link #setApplicationObject(Object)} call.
+     * 
+     * @return Application Object
+     */
+    public Object getApplicationObject();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java
new file mode 100644
index 0000000..300f7c7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCBootstrap.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.api.application;
+
+/**
+ * Implemented by the bootstrap class of the application that will manage its
+ * life cycle at a Node Controller.
+ * 
+ * @author vinayakb
+ * 
+ */
+public interface INCBootstrap extends IBootstrap {
+    /**
+     * Called by the infrastructure to set the NC Application Context for the
+     * application. The infrastructure makes this call prior to calling start().
+     * 
+     * @param appCtx
+     *            - The NC application context
+     */
+    public void setApplicationContext(INCApplicationContext appCtx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
new file mode 100644
index 0000000..a8f2fda
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.channels;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IInputChannel {
+    public void registerMonitor(IInputChannelMonitor monitor);
+
+    public void setAttachment(Object attachment);
+
+    public Object getAttachment();
+
+    public ByteBuffer getNextBuffer();
+
+    public void recycleBuffer(ByteBuffer buffer);
+
+    public void open(IHyracksTaskContext ctx) throws HyracksDataException;
+
+    public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
new file mode 100644
index 0000000..065f4c5
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannelMonitor.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.channels;
+
+public interface IInputChannelMonitor {
+    public void notifyFailure(IInputChannel channel);
+
+    public void notifyDataAvailability(IInputChannel channel, int nFrames);
+
+    public void notifyEndOfStream(IInputChannel channel);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/ClusterControllerInfo.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/ClusterControllerInfo.java
new file mode 100644
index 0000000..8dd4d6c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/ClusterControllerInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+
+public class ClusterControllerInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String clientNetAddress;
+
+    private final int clientNetPort;
+
+    private final int webPort;
+
+    public ClusterControllerInfo(String clientNetAddress, int clientNetPort, int webPort) {
+        this.clientNetAddress = clientNetAddress;
+        this.clientNetPort = clientNetPort;
+        this.webPort = webPort;
+    }
+
+    public int getWebPort() {
+        return webPort;
+    }
+
+    public String getClientNetAddress() {
+        return clientNetAddress;
+    }
+
+    public int getClientNetPort() {
+        return clientNetPort;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
new file mode 100644
index 0000000..e34e60d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class HyracksClientInterfaceFunctions {
+    public enum FunctionId {
+        GET_CLUSTER_CONTROLLER_INFO,
+        GET_CLUSTER_TOPOLOGY,
+        CREATE_APPLICATION,
+        START_APPLICATION,
+        DESTROY_APPLICATION,
+        CREATE_JOB,
+        GET_JOB_STATUS,
+        START_JOB,
+        WAIT_FOR_COMPLETION,
+        GET_NODE_CONTROLLERS_INFO
+    }
+
+    public abstract static class Function implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public abstract FunctionId getFunctionId();
+    }
+
+    public static class GetClusterControllerInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_CLUSTER_CONTROLLER_INFO;
+        }
+    }
+
+    public static class CreateApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public CreateApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.CREATE_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class StartApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public StartApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class DestroyApplicationFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+
+        public DestroyApplicationFunction(String appName) {
+            this.appName = appName;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.DESTROY_APPLICATION;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+    }
+
+    public static class GetJobStatusFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public GetJobStatusFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_JOB_STATUS;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class StartJobFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String appName;
+        private final byte[] acggfBytes;
+        private final EnumSet<JobFlag> jobFlags;
+
+        public StartJobFunction(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) {
+            this.appName = appName;
+            this.acggfBytes = acggfBytes;
+            this.jobFlags = jobFlags;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.START_JOB;
+        }
+
+        public String getAppName() {
+            return appName;
+        }
+
+        public byte[] getACGGFBytes() {
+            return acggfBytes;
+        }
+
+        public EnumSet<JobFlag> getJobFlags() {
+            return jobFlags;
+        }
+    }
+
+    public static class WaitForCompletionFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public WaitForCompletionFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.WAIT_FOR_COMPLETION;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class GetNodeControllersInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_NODE_CONTROLLERS_INFO;
+        }
+    }
+
+    public static class GetClusterTopologyFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_CLUSTER_TOPOLOGY;
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
new file mode 100644
index 0000000..4c06d42
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+
+public class HyracksClientInterfaceRemoteProxy implements IHyracksClientInterface {
+    private final IIPCHandle ipcHandle;
+
+    private final RPCInterface rpci;
+
+    public HyracksClientInterfaceRemoteProxy(IIPCHandle ipcHandle, RPCInterface rpci) {
+        this.ipcHandle = ipcHandle;
+        this.rpci = rpci;
+    }
+
+    @Override
+    public ClusterControllerInfo getClusterControllerInfo() throws Exception {
+        HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction gccif = new HyracksClientInterfaceFunctions.GetClusterControllerInfoFunction();
+        return (ClusterControllerInfo) rpci.call(ipcHandle, gccif);
+    }
+
+    @Override
+    public void createApplication(String appName) throws Exception {
+        HyracksClientInterfaceFunctions.CreateApplicationFunction caf = new HyracksClientInterfaceFunctions.CreateApplicationFunction(
+                appName);
+        rpci.call(ipcHandle, caf);
+    }
+
+    @Override
+    public void startApplication(String appName) throws Exception {
+        HyracksClientInterfaceFunctions.StartApplicationFunction saf = new HyracksClientInterfaceFunctions.StartApplicationFunction(
+                appName);
+        rpci.call(ipcHandle, saf);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        HyracksClientInterfaceFunctions.DestroyApplicationFunction daf = new HyracksClientInterfaceFunctions.DestroyApplicationFunction(
+                appName);
+        rpci.call(ipcHandle, daf);
+    }
+
+    @Override
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = new HyracksClientInterfaceFunctions.GetJobStatusFunction(
+                jobId);
+        return (JobStatus) rpci.call(ipcHandle, gjsf);
+    }
+
+    @Override
+    public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception {
+        HyracksClientInterfaceFunctions.StartJobFunction sjf = new HyracksClientInterfaceFunctions.StartJobFunction(
+                appName, acggfBytes, jobFlags);
+        return (JobId) rpci.call(ipcHandle, sjf);
+    }
+
+    @Override
+    public void waitForCompletion(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = new HyracksClientInterfaceFunctions.WaitForCompletionFunction(
+                jobId);
+        rpci.call(ipcHandle, wfcf);
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception {
+        HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction gncif = new HyracksClientInterfaceFunctions.GetNodeControllersInfoFunction();
+        return (Map<String, NodeControllerInfo>) rpci.call(ipcHandle, gncif);
+    }
+
+    @Override
+    public ClusterTopology getClusterTopology() throws Exception {
+        HyracksClientInterfaceFunctions.GetClusterTopologyFunction gctf = new HyracksClientInterfaceFunctions.GetClusterTopologyFunction();
+        return (ClusterTopology) rpci.call(ipcHandle, gctf);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
new file mode 100644
index 0000000..227524c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.FileEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import edu.uci.ics.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
+import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
+import edu.uci.ics.hyracks.ipc.api.RPCInterface;
+import edu.uci.ics.hyracks.ipc.impl.IPCSystem;
+import edu.uci.ics.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+
+/**
+ * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
+ * Controller.
+ * 
+ * @author vinayakb
+ */
+public final class HyracksConnection implements IHyracksClientConnection {
+    private final String ccHost;
+
+    private final IPCSystem ipc;
+
+    private final IHyracksClientInterface hci;
+
+    private final ClusterControllerInfo ccInfo;
+
+    /**
+     * Constructor to create a connection to the Hyracks Cluster Controller.
+     * 
+     * @param ccHost
+     *            Host name (or IP Address) where the Cluster Controller can be
+     *            reached.
+     * @param ccPort
+     *            Port to reach the Hyracks Cluster Controller at the specified
+     *            host name.
+     * @throws Exception
+     */
+    public HyracksConnection(String ccHost, int ccPort) throws Exception {
+        this.ccHost = ccHost;
+        RPCInterface rpci = new RPCInterface();
+        ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
+        ipc.start();
+        IIPCHandle ccIpchandle = ipc.getHandle(new InetSocketAddress(ccHost, ccPort));
+        this.hci = new HyracksClientInterfaceRemoteProxy(ccIpchandle, rpci);
+        ccInfo = hci.getClusterControllerInfo();
+    }
+
+    @Override
+    public void createApplication(String appName, File harFile) throws Exception {
+        hci.createApplication(appName);
+        if (harFile != null) {
+            HttpClient hc = new DefaultHttpClient();
+            HttpPut put = new HttpPut("http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/" + appName);
+            put.setEntity(new FileEntity(harFile, "application/octet-stream"));
+            HttpResponse response = hc.execute(put);
+            if (response.getStatusLine().getStatusCode() != 200) {
+                hci.destroyApplication(appName);
+                throw new HyracksException(response.getStatusLine().toString());
+            }
+        }
+        hci.startApplication(appName);
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+        hci.destroyApplication(appName);
+    }
+
+    @Override
+    public JobStatus getJobStatus(JobId jobId) throws Exception {
+        return hci.getJobStatus(jobId);
+    }
+
+    @Override
+    public JobId startJob(String appName, JobSpecification jobSpec) throws Exception {
+        return startJob(appName, jobSpec, EnumSet.noneOf(JobFlag.class));
+    }
+
+    @Override
+    public JobId startJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        JobSpecificationActivityClusterGraphGeneratorFactory jsacggf = new JobSpecificationActivityClusterGraphGeneratorFactory(
+                jobSpec);
+        return startJob(appName, jsacggf, jobFlags);
+    }
+
+    public JobId startJob(String appName, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+            throws Exception {
+        return hci.startJob(appName, JavaSerializationUtils.serialize(acggf), jobFlags);
+    }
+
+    @Override
+    public void waitForCompletion(JobId jobId) throws Exception {
+        hci.waitForCompletion(jobId);
+    }
+
+    @Override
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception {
+        return hci.getNodeControllersInfo();
+    }
+
+    @Override
+    public ClusterTopology getClusterTopology() throws Exception {
+        return hci.getClusterTopology();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
new file mode 100644
index 0000000..bdbb544
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.File;
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+
+/**
+ * Interface used by clients to communicate with the Hyracks Cluster Controller.
+ * 
+ * @author vinayakb
+ */
+public interface IHyracksClientConnection {
+    /**
+     * Create a Hyracks Application
+     * 
+     * @param appName
+     *            Name of the application
+     * @param harFile
+     *            Archive that contains deployable code for the application
+     * @throws Exception
+     */
+    public void createApplication(String appName, File harFile) throws Exception;
+
+    /**
+     * Destroy an already-deployed Hyracks application
+     * 
+     * @param appName
+     *            Name of the application
+     * @throws Exception
+     */
+    public void destroyApplication(String appName) throws Exception;
+
+    /**
+     * Gets the status of the specified Job.
+     * 
+     * @param jobId
+     *            JobId of the Job
+     * @return {@link JobStatus}
+     * @throws Exception
+     */
+    public JobStatus getJobStatus(JobId jobId) throws Exception;
+
+    /**
+     * Start the specified Job.
+     * 
+     * @param appName
+     *            Name of the application
+     * @param jobSpec
+     *            Job Specification
+     * @throws Exception
+     */
+    public JobId startJob(String appName, JobSpecification jobSpec) throws Exception;
+
+    /**
+     * Start the specified Job.
+     * 
+     * @param appName
+     *            Name of the application
+     * @param jobSpec
+     *            Job Specification
+     * @param jobFlags
+     *            Flags
+     * @throws Exception
+     */
+    public JobId startJob(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+    /**
+     * Start the specified Job.
+     * 
+     * @param appName
+     *            Name of the application
+     * @param acggf
+     *            Activity Cluster Graph Generator Factory
+     * @param jobFlags
+     *            Flags
+     * @throws Exception
+     */
+    public JobId startJob(String appName, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags)
+            throws Exception;
+
+    /**
+     * Waits until the specified job has completed, either successfully or has
+     * encountered a permanent failure.
+     * 
+     * @param jobId
+     *            JobId of the Job
+     * @throws Exception
+     */
+    public void waitForCompletion(JobId jobId) throws Exception;
+
+    /**
+     * Gets a map of node controller names to node information.
+     * 
+     * @return Map of node name to node information.
+     */
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
+
+    /**
+     * Get the cluster topology
+     * 
+     * @return the cluster topology
+     * @throws Exception
+     */
+    public ClusterTopology getClusterTopology() throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
new file mode 100644
index 0000000..ef5906e
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.util.EnumSet;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+
+public interface IHyracksClientInterface {
+    public ClusterControllerInfo getClusterControllerInfo() throws Exception;
+
+    public void createApplication(String appName) throws Exception;
+
+    public void startApplication(String appName) throws Exception;
+
+    public void destroyApplication(String appName) throws Exception;
+
+    public JobStatus getJobStatus(JobId jobId) throws Exception;
+
+    public JobId startJob(String appName, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+
+    public void waitForCompletion(JobId jobId) throws Exception;
+
+    public Map<String, NodeControllerInfo> getNodeControllersInfo() throws Exception;
+
+    public ClusterTopology getClusterTopology() throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
new file mode 100644
index 0000000..fd9218a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeControllerInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+
+public class NodeControllerInfo implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String nodeId;
+
+    private final NodeStatus status;
+
+    private final NetworkAddress netAddress;
+
+    public NodeControllerInfo(String nodeId, NodeStatus status, NetworkAddress netAddress) {
+        this.nodeId = nodeId;
+        this.status = status;
+        this.netAddress = netAddress;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public NodeStatus getStatus() {
+        return status;
+    }
+
+    public NetworkAddress getNetworkAddress() {
+        return netAddress;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java
new file mode 100644
index 0000000..d6b99d0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/NodeStatus.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client;
+
+public enum NodeStatus {
+    ALIVE,
+    DEAD
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
new file mode 100644
index 0000000..6c6c211
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/ActivityClusterGraphBuilder.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONException;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public class ActivityClusterGraphBuilder {
+    private static final Logger LOGGER = Logger.getLogger(ActivityClusterGraphBuilder.class.getName());
+
+    public ActivityClusterGraphBuilder() {
+    }
+
+    private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, Set<Set<ActivityId>> eqSets) {
+        for (Set<ActivityId> eqSet : eqSets) {
+            for (ActivityId t : eqSet) {
+                List<IConnectorDescriptor> inputList = jag.getActivityInputMap().get(t);
+                if (inputList != null) {
+                    for (IConnectorDescriptor conn : inputList) {
+                        ActivityId inTask = jag.getProducerActivity(conn.getConnectorId());
+                        if (!eqSet.contains(inTask)) {
+                            return Pair.<ActivityId, ActivityId> of(t, inTask);
+                        }
+                    }
+                }
+                List<IConnectorDescriptor> outputList = jag.getActivityOutputMap().get(t);
+                if (outputList != null) {
+                    for (IConnectorDescriptor conn : outputList) {
+                        ActivityId outTask = jag.getConsumerActivity(conn.getConnectorId());
+                        if (!eqSet.contains(outTask)) {
+                            return Pair.<ActivityId, ActivityId> of(t, outTask);
+                        }
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public ActivityClusterGraph inferActivityClusters(JobId jobId, JobActivityGraph jag) {
+        /*
+         * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
+         */
+        Map<ActivityId, Set<ActivityId>> stageMap = new HashMap<ActivityId, Set<ActivityId>>();
+        Set<Set<ActivityId>> stages = new HashSet<Set<ActivityId>>();
+        for (ActivityId taskId : jag.getActivityMap().keySet()) {
+            Set<ActivityId> eqSet = new HashSet<ActivityId>();
+            eqSet.add(taskId);
+            stageMap.put(taskId, eqSet);
+            stages.add(eqSet);
+        }
+
+        boolean changed = true;
+        while (changed) {
+            changed = false;
+            Pair<ActivityId, ActivityId> pair = findMergePair(jag, stages);
+            if (pair != null) {
+                merge(stageMap, stages, pair.getLeft(), pair.getRight());
+                changed = true;
+            }
+        }
+
+        ActivityClusterGraph acg = new ActivityClusterGraph();
+        Map<ActivityId, ActivityCluster> acMap = new HashMap<ActivityId, ActivityCluster>();
+        int acCounter = 0;
+        Map<ActivityId, IActivity> activityNodeMap = jag.getActivityMap();
+        List<ActivityCluster> acList = new ArrayList<ActivityCluster>();
+        for (Set<ActivityId> stage : stages) {
+            ActivityCluster ac = new ActivityCluster(acg, new ActivityClusterId(jobId, acCounter++));
+            acList.add(ac);
+            for (ActivityId aid : stage) {
+                IActivity activity = activityNodeMap.get(aid);
+                ac.addActivity(activity);
+                acMap.put(aid, ac);
+            }
+        }
+
+        for (Set<ActivityId> stage : stages) {
+            for (ActivityId aid : stage) {
+                IActivity activity = activityNodeMap.get(aid);
+                ActivityCluster ac = acMap.get(aid);
+                List<IConnectorDescriptor> aOutputs = jag.getActivityOutputMap().get(aid);
+                if (aOutputs == null || aOutputs.isEmpty()) {
+                    ac.addRoot(activity);
+                } else {
+                    int nActivityOutputs = aOutputs.size();
+                    for (int i = 0; i < nActivityOutputs; ++i) {
+                        IConnectorDescriptor conn = aOutputs.get(i);
+                        ac.addConnector(conn);
+                        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> pcPair = jag.getConnectorActivityMap()
+                                .get(conn.getConnectorId());
+                        ac.connect(conn, activity, i, pcPair.getRight().getLeft(), pcPair.getRight().getRight(), jag
+                                .getConnectorRecordDescriptorMap().get(conn.getConnectorId()));
+                    }
+                }
+            }
+        }
+
+        Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = jag.getBlocked2BlockerMap();
+        for (ActivityCluster s : acList) {
+            Map<ActivityId, Set<ActivityId>> acBlocked2BlockerMap = s.getBlocked2BlockerMap();
+            Set<ActivityCluster> blockerStages = new HashSet<ActivityCluster>();
+            for (ActivityId t : s.getActivityMap().keySet()) {
+                Set<ActivityId> blockerTasks = blocked2BlockerMap.get(t);
+                acBlocked2BlockerMap.put(t, blockerTasks);
+                if (blockerTasks != null) {
+                    for (ActivityId bt : blockerTasks) {
+                        blockerStages.add(acMap.get(bt));
+                    }
+                }
+            }
+            for (ActivityCluster bs : blockerStages) {
+                s.getDependencies().add(bs);
+            }
+        }
+        acg.addActivityClusters(acList);
+
+        if (LOGGER.isLoggable(Level.FINE)) {
+            try {
+                LOGGER.fine(acg.toJSON().toString(2));
+            } catch (JSONException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+        }
+        return acg;
+    }
+
+    private void merge(Map<ActivityId, Set<ActivityId>> eqSetMap, Set<Set<ActivityId>> eqSets, ActivityId t1,
+            ActivityId t2) {
+        Set<ActivityId> stage1 = eqSetMap.get(t1);
+        Set<ActivityId> stage2 = eqSetMap.get(t2);
+
+        Set<ActivityId> mergedSet = new HashSet<ActivityId>();
+        mergedSet.addAll(stage1);
+        mergedSet.addAll(stage2);
+
+        eqSets.remove(stage1);
+        eqSets.remove(stage2);
+        eqSets.add(mergedSet);
+
+        for (ActivityId t : mergedSet) {
+            eqSetMap.put(t, mergedSet);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
new file mode 100644
index 0000000..5b508b3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IConnectorDescriptorVisitor.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IConnectorDescriptorVisitor {
+    public void visit(IConnectorDescriptor conn) throws HyracksException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
new file mode 100644
index 0000000..fcdf9f0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/IOperatorDescriptorVisitor.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IOperatorDescriptorVisitor {
+    public void visit(IOperatorDescriptor op) throws HyracksException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
new file mode 100644
index 0000000..b4b9b3c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobActivityGraphBuilder.java
@@ -0,0 +1,127 @@
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobActivityGraphBuilder implements IActivityGraphBuilder {
+    private static final Logger LOGGER = Logger.getLogger(JobActivityGraphBuilder.class.getName());
+
+    private final Map<ActivityId, IOperatorDescriptor> activityOperatorMap;
+
+    private final JobActivityGraph jag;
+
+    private final JobSpecification jobSpec;
+
+    private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorProducerMap;
+
+    private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorConsumerMap;
+
+    public JobActivityGraphBuilder(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+        activityOperatorMap = new HashMap<ActivityId, IOperatorDescriptor>();
+        jag = new JobActivityGraph();
+        this.jobSpec = jobSpec;
+        connectorProducerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
+        connectorConsumerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
+    }
+
+    public void addConnector(IConnectorDescriptor conn) {
+        jag.getConnectorMap().put(conn.getConnectorId(), conn);
+        jag.getConnectorRecordDescriptorMap().put(conn.getConnectorId(), jobSpec.getConnectorRecordDescriptor(conn));
+    }
+
+    @Override
+    public void addBlockingEdge(IActivity blocker, IActivity blocked) {
+        addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityId(), blocker.getActivityId());
+    }
+
+    @Override
+    public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("Adding source edge: " + task.getActivityId() + ":" + operatorInputIndex + " -> "
+                    + task.getActivityId() + ":" + taskInputIndex);
+        }
+        IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
+        IConnectorDescriptor conn = jobSpec.getInputConnectorDescriptor(op, operatorInputIndex);
+        insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, conn);
+        connectorConsumerMap.put(conn.getConnectorId(), Pair.of(task, taskInputIndex));
+    }
+
+    @Override
+    public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("Adding target edge: " + task.getActivityId() + ":" + operatorOutputIndex + " -> "
+                    + task.getActivityId() + ":" + taskOutputIndex);
+        }
+        IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
+        IConnectorDescriptor conn = jobSpec.getOutputConnectorDescriptor(op, operatorOutputIndex);
+        insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, conn);
+        connectorProducerMap.put(conn.getConnectorId(), Pair.of(task, taskOutputIndex));
+    }
+
+    @Override
+    public void addActivity(IOperatorDescriptor op, IActivity task) {
+        activityOperatorMap.put(task.getActivityId(), op);
+        ActivityId activityId = task.getActivityId();
+        jag.getActivityMap().put(activityId, task);
+    }
+
+    public void finish() {
+        Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> caMap = jag
+                .getConnectorActivityMap();
+        for (Map.Entry<ConnectorDescriptorId, Pair<IActivity, Integer>> e : connectorProducerMap.entrySet()) {
+            ConnectorDescriptorId cdId = e.getKey();
+            Pair<IActivity, Integer> producer = e.getValue();
+            Pair<IActivity, Integer> consumer = connectorConsumerMap.get(cdId);
+            caMap.put(cdId, Pair.of(producer, consumer));
+        }
+    }
+
+    private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
+        Set<V> targets = map.get(n1);
+        if (targets == null) {
+            targets = new HashSet<V>();
+            map.put(n1, targets);
+        }
+        targets.add(n2);
+    }
+
+    private <T> void extend(List<T> list, int index) {
+        int n = list.size();
+        for (int i = n; i <= index; ++i) {
+            list.add(null);
+        }
+    }
+
+    private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+        List<V> vList = map.get(key);
+        if (vList == null) {
+            vList = new ArrayList<V>();
+            map.put(key, vList);
+        }
+        extend(vList, index);
+        vList.set(index, value);
+    }
+
+    public JobActivityGraph getActivityGraph() {
+        return jag;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
new file mode 100644
index 0000000..f36b7b3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
+import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobSpecificationActivityClusterGraphGeneratorFactory implements IActivityClusterGraphGeneratorFactory {
+    private static final long serialVersionUID = 1L;
+
+    private final JobSpecification spec;
+
+    public JobSpecificationActivityClusterGraphGeneratorFactory(JobSpecification jobSpec) {
+        this.spec = jobSpec;
+    }
+
+    @Override
+    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(String appName, JobId jobId,
+            final ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException {
+        final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
+        PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+            @Override
+            public void visit(IConnectorDescriptor conn) throws HyracksException {
+                builder.addConnector(conn);
+            }
+        });
+        PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+            @Override
+            public void visit(IOperatorDescriptor op) {
+                op.contributeActivities(builder);
+            }
+        });
+        builder.finish();
+        final JobActivityGraph jag = builder.getActivityGraph();
+        ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
+
+        final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, jag);
+        acg.setFrameSize(spec.getFrameSize());
+        acg.setMaxReattempts(spec.getMaxReattempts());
+        acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
+        acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
+        acg.setConnectorPolicyAssignmentPolicy(spec.getConnectorPolicyAssignmentPolicy());
+        acg.setUseConnectorPolicyForScheduling(spec.isUseConnectorPolicyForScheduling());
+        final Set<Constraint> constraints = new HashSet<Constraint>();
+        final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
+            @Override
+            public void addConstraint(Constraint constraint) {
+                constraints.add(constraint);
+            }
+        };
+        PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+            @Override
+            public void visit(IOperatorDescriptor op) {
+                op.contributeSchedulingConstraints(acceptor, ccAppCtx);
+            }
+        });
+        PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+            @Override
+            public void visit(IConnectorDescriptor conn) {
+                conn.contributeSchedulingConstraints(acceptor, acg.getConnectorMap().get(conn.getConnectorId()),
+                        ccAppCtx);
+            }
+        });
+        constraints.addAll(spec.getUserConstraints());
+        return new IActivityClusterGraphGenerator() {
+            @Override
+            public ActivityClusterGraph initialize() {
+                return acg;
+            }
+
+            @Override
+            public Set<Constraint> getConstraints() {
+                return constraints;
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
new file mode 100644
index 0000000..30f26c7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/PlanUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.client.impl;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class PlanUtils {
+    public static void visit(JobSpecification spec, IOperatorDescriptorVisitor visitor) throws HyracksException {
+        Set<OperatorDescriptorId> seen = new HashSet<OperatorDescriptorId>();
+        for (IOperatorDescriptor op : spec.getOperatorMap().values()) {
+            visitOperator(visitor, seen, op);
+        }
+    }
+
+    private static void visitOperator(IOperatorDescriptorVisitor visitor, Set<OperatorDescriptorId> seen,
+            IOperatorDescriptor op) throws HyracksException {
+        if (!seen.contains(op)) {
+            visitor.visit(op);
+        }
+        seen.add(op.getOperatorId());
+    }
+
+    public static void visit(JobSpecification spec, IConnectorDescriptorVisitor visitor) throws HyracksException {
+        for (IConnectorDescriptor c : spec.getConnectorMap().values()) {
+            visitor.visit(c);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
new file mode 100644
index 0000000..cc76bd4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+public interface FrameConstants {
+    public static final int SIZE_LEN = 4;
+
+    public static final boolean DEBUG_FRAME_IO = false;
+
+    public static final int FRAME_FIELD_MAGIC = 0x12345678;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
new file mode 100644
index 0000000..c8478d6
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/FrameHelper.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+public class FrameHelper {
+    public static int getTupleCountOffset(int frameSize) {
+        return frameSize - 4;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
new file mode 100644
index 0000000..0188578
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameReader.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFrameReader {
+    public void open() throws HyracksDataException;
+
+    public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException;
+
+    public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
new file mode 100644
index 0000000..8b82728
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameTupleAccessor.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+public interface IFrameTupleAccessor {
+    public int getFieldCount();
+
+    public int getFieldSlotsLength();
+
+    public int getFieldEndOffset(int tupleIndex, int fIdx);
+
+    public int getFieldStartOffset(int tupleIndex, int fIdx);
+
+    public int getFieldLength(int tupleIndex, int fIdx);
+
+    public int getTupleEndOffset(int tupleIndex);
+
+    public int getTupleStartOffset(int tupleIndex);
+
+    public int getTupleCount();
+
+    public ByteBuffer getBuffer();
+
+    public void reset(ByteBuffer buffer);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
new file mode 100644
index 0000000..a85bd7e
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * {@link IFrameWriter} is the interface implemented by a stream consumer. An
+ * {@link IFrameWriter} could be in one of the following states:
+ * <ul>
+ * <li>INITIAL</li>
+ * <li>OPENED</li>
+ * <li>CLOSED</li>
+ * <li>FAILED</li>
+ * </ul>
+ * A producer follows the following protocol when using an {@link IFrameWriter}.
+ * Initially, the {@link IFrameWriter} is in the INITIAL state.
+ * The first valid call to an {@link IFrameWriter} is always the
+ * {@link IFrameWriter#open()}. This call provides the opportunity for the
+ * {@link IFrameWriter} implementation to allocate any resources for its
+ * processing. Once this call returns, the {@link IFrameWriter} is in the OPENED
+ * state. If an error occurs
+ * during the {@link IFrameWriter#open()} call, a {@link HyracksDataException}
+ * is thrown and it stays in the INITIAL state.
+ * While the {@link IFrameWriter} is in the OPENED state, the producer can call
+ * one of:
+ * <ul>
+ * <li> {@link IFrameWriter#close()} to give up any resources owned by the
+ * {@link IFrameWriter} and enter the CLOSED state.</li>
+ * <li> {@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the
+ * {@link IFrameWriter}. The call returns normally on success and the
+ * {@link IFrameWriter} remains in the OPENED state. On failure, the call throws
+ * a {@link HyracksDataException}, and the {@link IFrameWriter} enters the ERROR
+ * state.</li>
+ * <li> {@link IFrameWriter#fail()} to indicate that stream is to be aborted. The
+ * {@link IFrameWriter} enters the FAILED state.</li>
+ * </ul>
+ * In the FAILED state, the only call allowed is the
+ * {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
+ * state and give up all resources.
+ * No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
+ * 
+ * Note: If the call to {@link IFrameWriter#open()} failed, the
+ * {@link IFrameWriter#close()} is not called by the producer. So an exceptional
+ * return from the {@link IFrameWriter#open()} call must clean up all partially
+ * allocated resources.
+ * 
+ * @author vinayakb
+ */
+public interface IFrameWriter {
+    /**
+     * First call to allocate any resources.
+     */
+    public void open() throws HyracksDataException;
+
+    /**
+     * Provide data to the stream of this {@link IFrameWriter}.
+     * 
+     * @param buffer
+     *            - Buffer containing data.
+     * @throws HyracksDataException
+     */
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
+
+    /**
+     * Indicate that a failure was encountered and the current stream is to be
+     * aborted.
+     * 
+     * @throws HyracksDataException
+     */
+    public void fail() throws HyracksDataException;
+
+    /**
+     * Close this {@link IFrameWriter} and give up all resources.
+     * 
+     * @throws HyracksDataException
+     */
+    public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java
new file mode 100644
index 0000000..89e6da3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.util.Collection;
+
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public interface IPartitionCollector {
+    public JobId getJobId();
+
+    public ConnectorDescriptorId getConnectorId();
+
+    public int getReceiverIndex();
+
+    public void open() throws HyracksException;
+
+    public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException;
+
+    public IFrameReader getReader() throws HyracksException;
+
+    public void close() throws HyracksException;
+
+    public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException;
+
+    public void abort();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionWriterFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionWriterFactory.java
new file mode 100644
index 0000000..e15ec6d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IPartitionWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IPartitionWriterFactory {
+    public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
new file mode 100644
index 0000000..563fdc1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/NetworkAddress.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public final class NetworkAddress implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final byte[] ipAddress;
+
+    private final int port;
+
+    public NetworkAddress(byte[] ipAddress, int port) {
+        this.ipAddress = ipAddress;
+        this.port = port;
+    }
+
+    public byte[] getIpAddress() {
+        return ipAddress;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    public String toString() {
+        return Arrays.toString(ipAddress) + ":" + port;
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(ipAddress) + port;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof NetworkAddress)) {
+            return false;
+        }
+        NetworkAddress on = (NetworkAddress) o;
+        return on.port == port && Arrays.equals(on.ipAddress, ipAddress);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/PartitionChannel.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/PartitionChannel.java
new file mode 100644
index 0000000..b75ed07
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/PartitionChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.comm;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class PartitionChannel {
+    private PartitionId partitionId;
+
+    private IInputChannel channel;
+
+    public PartitionChannel(PartitionId partitionId, IInputChannel channel) {
+        this.partitionId = partitionId;
+        this.channel = channel;
+    }
+
+    public PartitionId getPartitionId() {
+        return partitionId;
+    }
+
+    public IInputChannel getInputChannel() {
+        return channel;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/Constraint.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/Constraint.java
new file mode 100644
index 0000000..0668e00
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/Constraint.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstraintExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
+
+public class Constraint implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final LValueConstraintExpression lValue;
+
+    private final ConstraintExpression rValue;
+
+    public Constraint(LValueConstraintExpression lValue, ConstraintExpression rValue) {
+        this.lValue = lValue;
+        this.rValue = rValue;
+    }
+
+    public LValueConstraintExpression getLValue() {
+        return lValue;
+    }
+
+    public ConstraintExpression getRValue() {
+        return rValue;
+    }
+
+    @Override
+    public String toString() {
+        return lValue + " in " + rValue;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/IConstraintAcceptor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/IConstraintAcceptor.java
new file mode 100644
index 0000000..2e41b0a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/IConstraintAcceptor.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public interface IConstraintAcceptor {
+    public void addConstraint(Constraint constraint);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraintHelper.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraintHelper.java
new file mode 100644
index 0000000..0e4ca69
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraintHelper.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+import edu.uci.ics.hyracks.api.constraints.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
+import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class PartitionConstraintHelper {
+    public static void addPartitionCountConstraint(JobSpecification spec, IOperatorDescriptor op, int count) {
+        spec.addUserConstraint(new Constraint(new PartitionCountExpression(op.getOperatorId()), new ConstantExpression(
+                count)));
+    }
+
+    public static void addLocationChoiceConstraint(JobSpecification spec, IOperatorDescriptor op, String[][] choices) {
+        addPartitionCountConstraint(spec, op, choices.length);
+        for (int i = 0; i < choices.length; ++i) {
+            spec.addUserConstraint(new Constraint(new PartitionLocationExpression(op.getOperatorId(), i),
+                    new ConstantExpression(choices[i])));
+        }
+    }
+
+    public static void addAbsoluteLocationConstraint(JobSpecification spec, IOperatorDescriptor op, String... locations) {
+        addPartitionCountConstraint(spec, op, locations.length);
+        for (int i = 0; i < locations.length; ++i) {
+            spec.addUserConstraint(new Constraint(new PartitionLocationExpression(op.getOperatorId(), i),
+                    new ConstantExpression(locations[i])));
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstantExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstantExpression.java
new file mode 100644
index 0000000..6e10911
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstantExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+import java.util.Collection;
+
+public class ConstantExpression extends ConstraintExpression {
+    private static final long serialVersionUID = 1L;
+
+    private final Object value;
+
+    public ConstantExpression(Object value) {
+        this.value = value;
+    }
+
+    @Override
+    public ExpressionTag getTag() {
+        return ExpressionTag.CONSTANT;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+
+    @Override
+    public void getChildren(Collection<ConstraintExpression> children) {
+    }
+
+    @Override
+    protected void toString(StringBuilder buffer) {
+        buffer.append(getTag()).append('[').append(value).append(':').append(value.getClass().getName()).append(']');
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstraintExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstraintExpression.java
new file mode 100644
index 0000000..811c048
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/ConstraintExpression.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+public abstract class ConstraintExpression implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public enum ExpressionTag {
+        CONSTANT,
+        PARTITION_COUNT,
+        PARTITION_LOCATION,
+    }
+
+    public abstract ExpressionTag getTag();
+
+    public abstract void getChildren(Collection<ConstraintExpression> children);
+
+    @Override
+    public final String toString() {
+        StringBuilder buffer = new StringBuilder();
+        toString(buffer);
+        return buffer.toString();
+    }
+
+    protected abstract void toString(StringBuilder buffer);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/LValueConstraintExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/LValueConstraintExpression.java
new file mode 100644
index 0000000..7557460
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/LValueConstraintExpression.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+public abstract class LValueConstraintExpression extends ConstraintExpression {
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionCountExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionCountExpression.java
new file mode 100644
index 0000000..dc73947
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionCountExpression.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+import java.util.Collection;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public final class PartitionCountExpression extends LValueConstraintExpression {
+    private static final long serialVersionUID = 1L;
+
+    private final OperatorDescriptorId opId;
+
+    public PartitionCountExpression(OperatorDescriptorId opId) {
+        this.opId = opId;
+    }
+
+    @Override
+    public ExpressionTag getTag() {
+        return ExpressionTag.PARTITION_COUNT;
+    }
+
+    public OperatorDescriptorId getOperatorDescriptorId() {
+        return opId;
+    }
+
+    @Override
+    public void getChildren(Collection<ConstraintExpression> children) {
+    }
+
+    @Override
+    protected void toString(StringBuilder buffer) {
+        buffer.append(getTag()).append('(').append(opId.toString()).append(')');
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((opId == null) ? 0 : opId.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        PartitionCountExpression other = (PartitionCountExpression) obj;
+        if (opId == null) {
+            if (other.opId != null)
+                return false;
+        } else if (!opId.equals(other.opId))
+            return false;
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionLocationExpression.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionLocationExpression.java
new file mode 100644
index 0000000..fbfdc1c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/constraints/expressions/PartitionLocationExpression.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints.expressions;
+
+import java.util.Collection;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public final class PartitionLocationExpression extends LValueConstraintExpression {
+    private static final long serialVersionUID = 1L;
+
+    private final OperatorDescriptorId opId;
+    private final int partition;
+
+    public PartitionLocationExpression(OperatorDescriptorId opId, int partition) {
+        this.opId = opId;
+        this.partition = partition;
+    }
+
+    @Override
+    public ExpressionTag getTag() {
+        return ExpressionTag.PARTITION_LOCATION;
+    }
+
+    public OperatorDescriptorId getOperatorDescriptorId() {
+        return opId;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    @Override
+    public void getChildren(Collection<ConstraintExpression> children) {
+    }
+
+    @Override
+    protected void toString(StringBuilder buffer) {
+        buffer.append(getTag()).append('(').append(opId.toString()).append(", ").append(partition).append(')');
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((opId == null) ? 0 : opId.hashCode());
+        result = prime * result + partition;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        PartitionLocationExpression other = (PartitionLocationExpression) obj;
+        if (opId == null) {
+            if (other.opId != null)
+                return false;
+        } else if (!opId.equals(other.opId))
+            return false;
+        if (partition != other.partition)
+            return false;
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
new file mode 100644
index 0000000..1e49fe2
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
+import edu.uci.ics.hyracks.api.topology.ClusterTopology;
+
+public interface ICCContext {
+    public ClusterControllerInfo getClusterControllerInfo();
+
+    public void getIPAddressNodeMap(Map<String, Set<String>> map) throws Exception;
+
+    public ClusterTopology getClusterTopology();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
new file mode 100644
index 0000000..9cb8f10
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public interface IHyracksCommonContext {
+    public int getFrameSize();
+
+    public IIOManager getIOManager();
+
+    public ByteBuffer allocateFrame();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
new file mode 100644
index 0000000..fad4300
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry {
+    public INCApplicationContext getApplicationContext();
+
+    public JobId getJobId();
+
+    public ICounterContext getCounterContext();
+
+    public Object getGlobalJobData();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
new file mode 100644
index 0000000..c4989c5
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public interface IHyracksRootContext {
+    public IIOManager getIOManager();
+
+    public Map<String, NodeControllerInfo> getNodeControllerInfos() throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
new file mode 100644
index 0000000..e964d66
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksTaskContext.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
+import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
+
+public interface IHyracksTaskContext extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry,
+        IOperatorEnvironment {
+    public IHyracksJobletContext getJobletContext();
+
+    public TaskAttemptId getTaskAttemptId();
+
+    public ICounterContext getCounterContext();
+
+    public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
new file mode 100644
index 0000000..41b4c23
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class ActivityId implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final OperatorDescriptorId odId;
+    private final int id;
+
+    public ActivityId(OperatorDescriptorId odId, int id) {
+        this.odId = odId;
+        this.id = id;
+    }
+
+    public OperatorDescriptorId getOperatorDescriptorId() {
+        return odId;
+    }
+
+    public int getLocalId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) (odId.hashCode() + id);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ActivityId)) {
+            return false;
+        }
+        ActivityId other = (ActivityId) o;
+        return other.odId.equals(odId) && other.id == id;
+    }
+
+    public String toString() {
+        return "ANID:" + odId + ":" + id;
+    }
+
+    public static ActivityId parse(String str) {
+        if (str.startsWith("ANID:")) {
+            str = str.substring(5);
+            int idIdx = str.lastIndexOf(':');
+            return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)), Integer.parseInt(str
+                    .substring(idIdx + 1)));
+        }
+        throw new IllegalArgumentException("Unable to parse: " + str);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
new file mode 100644
index 0000000..3133a7f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class ConnectorDescriptorId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int id;
+
+    public ConnectorDescriptorId(int id) {
+        this.id = id;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return id;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (!(obj instanceof ConnectorDescriptorId)) {
+            return false;
+        }
+        ConnectorDescriptorId other = (ConnectorDescriptorId) obj;
+        return id == other.id;
+    }
+
+    @Override
+    public String toString() {
+        return "CDID:" + id;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
new file mode 100644
index 0000000..df8fd53
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IActivity extends Serializable {
+    public ActivityId getActivityId();
+
+    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
new file mode 100644
index 0000000..bbda23c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+public interface IActivityGraphBuilder {
+    public void addActivity(IOperatorDescriptor op, IActivity task);
+
+    public void addBlockingEdge(IActivity blocker, IActivity blocked);
+
+    public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex);
+
+    public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
new file mode 100644
index 0000000..3a6ee7a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+
+/**
+ * Connector that connects operators in a Job.
+ * 
+ * @author vinayakb
+ */
+public interface IConnectorDescriptor extends Serializable {
+    /**
+     * Gets the id of the connector.
+     * 
+     * @return
+     */
+    public ConnectorDescriptorId getConnectorId();
+
+    /**
+     * Factory method to create the send side writer that writes into this
+     * connector.
+     * 
+     * @param ctx
+     *            Context
+     * @param recordDesc
+     *            Record Descriptor
+     * @param edwFactory
+     *            Endpoint writer factory.
+     * @param index
+     *            ordinal index of the data producer partition.
+     * @param nProducerPartitions
+     *            Number of partitions of the producing operator.
+     * @param nConsumerPartitions
+     *            Number of partitions of the consuming operator.
+     * @return data writer.
+     * @throws Exception
+     */
+    public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+            throws HyracksDataException;
+
+    /**
+     * Factory metod to create the receive side reader that reads data from this
+     * connector.
+     * 
+     * @param ctx
+     *            Context
+     * @param recordDesc
+     *            Job plan
+     * @param receiverIndex
+     *            ordinal index of the data consumer partition
+     * @param nProducerPartitions
+     *            Number of partitions of the producing operator.
+     * @param nConsumerPartitions
+     *            Number of partitions of the consuming operator.
+     * @return partition collector
+     * @throws HyracksDataException
+     */
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
+            int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
+
+    /**
+     * Contribute any scheduling constraints imposed by this connector
+     * 
+     * @param constraintAcceptor
+     *            - Constraint Acceptor
+     * @param ac
+     *            - Activity Cluster
+     */
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ActivityCluster ac,
+            ICCApplicationContext appCtx);
+
+    /**
+     * Indicate which consumer partitions may receive data from the given
+     * producer partition.
+     */
+    public void indicateTargetPartitions(int nProducerPartitions, int nConsumerPartitions, int producerIndex,
+            BitSet targetBitmap);
+
+    /**
+     * Indicate which producer partitions are required for the given receiver.
+     */
+    public void indicateSourcePartitions(int nProducerPartitions, int nConsumerPartitions, int consumerIndex,
+            BitSet sourceBitmap);
+
+    /**
+     * Gets the display name.
+     */
+    public String getDisplayName();
+
+    /**
+     * Sets the display name.
+     */
+    public void setDisplayName(String displayName);
+
+    /**
+     * Translate this connector descriptor to JSON.
+     * 
+     * @return
+     * @throws JSONException
+     */
+    public JSONObject toJSON() throws JSONException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataReader.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataReader.java
new file mode 100644
index 0000000..b5f45cb
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataReader.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Provides data to a consumer.
+ * 
+ * @author vinayakb
+ */
+public interface IDataReader<T> {
+    /**
+     * Reads one data item.
+     * 
+     * @return data. <code>null</code> indicates end of stream.
+     * @throws Exception
+     */
+    public T readData() throws HyracksDataException;
+
+    /**
+     * Closes this reader.
+     * 
+     * @throws Exception
+     */
+    public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
new file mode 100644
index 0000000..4fb6f18
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IDataWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Accepts data from data producers.
+ * 
+ * @author vinayakb
+ */
+public interface IDataWriter<T> {
+    /**
+     * Pushes data to the acceptor.
+     * 
+     * @param data
+     *            - Data pushed to the acceptor. <code>null</code> indicates the
+     *            end of stream.
+     * @throws HyracksDataException
+     */
+    public void writeData(T data) throws HyracksDataException;
+
+    /**
+     * Indicates that the stream has failed.
+     * 
+     * @throws HyracksDataException
+     */
+    public void fail() throws HyracksDataException;
+
+    /**
+     * Closes this writer.
+     * 
+     * @throws Exception
+     */
+    public void close() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataReader.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataReader.java
new file mode 100644
index 0000000..d0b6474
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataReader.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IOpenableDataReader<T> extends IDataReader<T> {
+    public void open() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataWriter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataWriter.java
new file mode 100644
index 0000000..b2b15c6
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOpenableDataWriter.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IOpenableDataWriter<T> extends IDataWriter<T> {
+    public void open() throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
new file mode 100644
index 0000000..43bf141
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+/**
+ * Descriptor for operators in Hyracks.
+ * 
+ * @author vinayakb
+ */
+public interface IOperatorDescriptor extends Serializable {
+    /**
+     * Returns the id of the operator.
+     * 
+     * @return operator id
+     */
+    public OperatorDescriptorId getOperatorId();
+
+    /**
+     * Returns the number of inputs into this operator.
+     * 
+     * @return Number of inputs.
+     */
+    public int getInputArity();
+
+    /**
+     * Returns the number of outputs out of this operator.
+     * 
+     * @return Number of outputs.
+     */
+    public int getOutputArity();
+
+    /**
+     * Gets the output record descriptor
+     * 
+     * @return Array of RecordDescriptor, one per output.
+     */
+    public RecordDescriptor[] getOutputRecordDescriptors();
+
+    /**
+     * Contributes the activity graph that describes the behavior of this
+     * operator.
+     * 
+     * @param builder
+     *            - graph builder
+     */
+    public void contributeActivities(IActivityGraphBuilder builder);
+
+    /**
+     * Contributes any scheduling constraints imposed by this operator.
+     * 
+     * @param constraintAcceptor
+     *            - Constraint Acceptor
+     * @param plan
+     *            - Job Plan
+     */
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, ICCApplicationContext appCtx);
+
+    /**
+     * Gets the display name.
+     */
+    public String getDisplayName();
+
+    /**
+     * Sets the display name.
+     */
+    public void setDisplayName(String displayName);
+
+    /**
+     * Translates this operator descriptor to JSON.
+     */
+    public JSONObject toJSON() throws JSONException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePullable.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePullable.java
new file mode 100644
index 0000000..1fa039f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePullable.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+public interface IOperatorNodePullable extends IOpenableDataReader<Object[]> {
+    public void setDataReader(int index, IOpenableDataWriter<Object[]> writer);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
new file mode 100644
index 0000000..62797ac
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorNodePushable.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IOperatorNodePushable {
+    public void initialize() throws HyracksDataException;
+
+    public void deinitialize() throws HyracksDataException;
+
+    public int getInputArity();
+
+    public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc);
+
+    public IFrameWriter getInputFrameWriter(int index);
+
+    public String getDisplayName();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
new file mode 100644
index 0000000..8794e09
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorDescriptorId.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class OperatorDescriptorId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final int id;
+
+    public OperatorDescriptorId(int id) {
+        this.id = id;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return id;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof OperatorDescriptorId)) {
+            return false;
+        }
+        return ((OperatorDescriptorId) o).id == id;
+    }
+
+    @Override
+    public String toString() {
+        return "ODID:" + id;
+    }
+
+    public static OperatorDescriptorId parse(String str) {
+        if (str.startsWith("ODID:")) {
+            str = str.substring(5);
+            return new OperatorDescriptorId(Integer.parseInt(str));
+        }
+        throw new IllegalArgumentException("Unable to parse: " + str);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorInstanceId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorInstanceId.java
new file mode 100644
index 0000000..f6ee39b
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/OperatorInstanceId.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class OperatorInstanceId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private OperatorDescriptorId odId;
+    private int partition;
+
+    public OperatorInstanceId(OperatorDescriptorId odId, int partition) {
+        this.odId = odId;
+        this.partition = partition;
+    }
+
+    public OperatorDescriptorId getOperatorId() {
+        return odId;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((odId == null) ? 0 : odId.hashCode());
+        result = prime * result + partition;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        OperatorInstanceId other = (OperatorInstanceId) obj;
+        if (odId == null) {
+            if (other.odId != null)
+                return false;
+        } else if (!odId.equals(other.odId))
+            return false;
+        if (partition != other.partition)
+            return false;
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
new file mode 100644
index 0000000..0fb44c1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskAttemptId.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class TaskAttemptId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final TaskId taskId;
+
+    private final int attempt;
+
+    public TaskAttemptId(TaskId taskId, int attempt) {
+        this.taskId = taskId;
+        this.attempt = attempt;
+    }
+
+    public TaskId getTaskId() {
+        return taskId;
+    }
+
+    public int getAttempt() {
+        return attempt;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof TaskAttemptId)) {
+            return false;
+        }
+        TaskAttemptId oTaskId = (TaskAttemptId) o;
+        return oTaskId.attempt == attempt && oTaskId.taskId.equals(taskId);
+    }
+
+    @Override
+    public int hashCode() {
+        return taskId.hashCode() + attempt;
+    }
+
+    @Override
+    public String toString() {
+        return "TAID:" + taskId + ":" + attempt;
+    }
+
+    public static TaskAttemptId parse(String str) {
+        if (str.startsWith("TAID:")) {
+            str = str.substring(5);
+            int idIdx = str.lastIndexOf(':');
+            return new TaskAttemptId(TaskId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
+        }
+        throw new IllegalArgumentException("Unable to parse: " + str);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
new file mode 100644
index 0000000..7e0b22d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+import java.io.Serializable;
+
+public final class TaskId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ActivityId activityId;
+
+    private final int partition;
+
+    public TaskId(ActivityId activityId, int partition) {
+        this.activityId = activityId;
+        this.partition = partition;
+    }
+
+    public ActivityId getActivityId() {
+        return activityId;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof TaskId)) {
+            return false;
+        }
+        TaskId oTaskId = (TaskId) o;
+        return oTaskId.partition == partition && oTaskId.activityId.equals(activityId);
+    }
+
+    @Override
+    public int hashCode() {
+        return activityId.hashCode() + partition;
+    }
+
+    @Override
+    public String toString() {
+        return "TID:" + activityId + ":" + partition;
+    }
+
+    public static TaskId parse(String str) {
+        if (str.startsWith("TID:")) {
+            str = str.substring(4);
+            int idIdx = str.lastIndexOf(':');
+            return new TaskId(ActivityId.parse(str.substring(0, idIdx)), Integer.parseInt(str.substring(idIdx + 1)));
+        }
+        throw new IllegalArgumentException("Unable to parse: " + str);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
new file mode 100644
index 0000000..d6d6fee
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+import java.io.Serializable;
+
+public interface IConnectorPolicy extends Serializable {
+    public boolean requiresProducerConsumerCoscheduling();
+
+    public boolean consumerWaitsForProducerToFinish();
+
+    public boolean materializeOnSendSide();
+
+    public boolean materializeOnReceiveSide();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..ca684e4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+
+public interface IConnectorPolicyAssignmentPolicy extends Serializable {
+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+            int[] fanouts);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
new file mode 100644
index 0000000..b55947e
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipeliningConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class PipeliningConnectorPolicy implements IConnectorPolicy {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean requiresProducerConsumerCoscheduling() {
+        return true;
+    }
+
+    @Override
+    public boolean consumerWaitsForProducerToFinish() {
+        return false;
+    }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return false;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
new file mode 100644
index 0000000..a5017de
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedBlockingConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class SendSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean requiresProducerConsumerCoscheduling() {
+        return false;
+    }
+
+    @Override
+    public boolean consumerWaitsForProducerToFinish() {
+        return true;
+    }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return true;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
new file mode 100644
index 0000000..5d522c0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedPipeliningConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class SendSideMaterializedPipeliningConnectorPolicy implements IConnectorPolicy {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean requiresProducerConsumerCoscheduling() {
+        return false;
+    }
+
+    @Override
+    public boolean consumerWaitsForProducerToFinish() {
+        return false;
+    }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return true;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
new file mode 100644
index 0000000..82608c4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class SendSideMaterializedReceiveSideMaterializedBlockingConnectorPolicy implements IConnectorPolicy {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean requiresProducerConsumerCoscheduling() {
+        return false;
+    }
+
+    @Override
+    public boolean consumerWaitsForProducerToFinish() {
+        return true;
+    }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return true;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
new file mode 100644
index 0000000..1cc0583
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.connectors;
+
+public final class SendSideMaterializedReceiveSideMaterializedPipeliningConnectorPolicy implements IConnectorPolicy {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean requiresProducerConsumerCoscheduling() {
+        return false;
+    }
+
+    @Override
+    public boolean consumerWaitsForProducerToFinish() {
+        return false;
+    }
+
+    @Override
+    public boolean materializeOnSendSide() {
+        return true;
+    }
+
+    @Override
+    public boolean materializeOnReceiveSide() {
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
new file mode 100644
index 0000000..f216773
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/state/IStateObject.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.state;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public interface IStateObject {
+    public JobId getJobId();
+
+    public Object getId();
+
+    public long getMemoryOccupancy();
+
+    public void toBytes(DataOutput out) throws IOException;
+
+    public void fromBytes(DataInput in) throws IOException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparator.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparator.java
new file mode 100644
index 0000000..767684d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparator.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+public interface IBinaryComparator {
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparatorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparatorFactory.java
new file mode 100644
index 0000000..9976818
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryComparatorFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IBinaryComparatorFactory extends Serializable {
+    public IBinaryComparator createBinaryComparator();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunction.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunction.java
new file mode 100644
index 0000000..a995703
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunction.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+public interface IBinaryHashFunction {
+    int hash(byte[] bytes, int offset, int length);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFactory.java
new file mode 100644
index 0000000..a064d5f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IBinaryHashFunctionFactory extends Serializable {
+    public IBinaryHashFunction createBinaryHashFunction();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFamily.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFamily.java
new file mode 100644
index 0000000..5cb27d6
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IBinaryHashFunctionFamily.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IBinaryHashFunctionFamily extends Serializable {
+    public IBinaryHashFunction createBinaryHashFunction(int seed);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparator.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparator.java
new file mode 100644
index 0000000..2d24496
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparator.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+public interface IComparator<T> {
+    public int compare(T o1, T o2);
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparatorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparatorFactory.java
new file mode 100644
index 0000000..ef9161f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IComparatorFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IComparatorFactory<T> extends Serializable {
+    public IComparator<T> createComparator();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunction.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunction.java
new file mode 100644
index 0000000..9124fca
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunction.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IHashFunction<T> {
+    public int hash(T o) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunctionFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunctionFactory.java
new file mode 100644
index 0000000..b67a96c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IHashFunctionFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IHashFunctionFactory<T> extends Serializable {
+    public IHashFunction<T> createHashFunction();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputer.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputer.java
new file mode 100644
index 0000000..0a8fdcd
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputer.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+public interface INormalizedKeyComputer {
+    public int normalize(byte[] bytes, int start, int length);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..d58632c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INormalizedKeyComputerFactory extends Serializable {
+    public INormalizedKeyComputer createNormalizedKeyComputer();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
new file mode 100644
index 0000000..7552c17
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface INullWriter {
+    public void writeNull(DataOutput out) throws HyracksDataException;
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
new file mode 100644
index 0000000..6d9a744
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INullWriterFactory extends Serializable {
+    public INullWriter createNullWriter();
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
new file mode 100644
index 0000000..c480446
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+
+public interface IRecordDescriptorProvider {
+    public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex);
+
+    public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ISerializerDeserializer.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ISerializerDeserializer.java
new file mode 100644
index 0000000..33ba602
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ISerializerDeserializer.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISerializerDeserializer<T> extends Serializable {
+    /**
+     * Deserialization method.
+     * 
+     * @param in
+     *            - Stream to read instance from.
+     * @return A new instance of T with data.
+     */
+    public T deserialize(DataInput in) throws HyracksDataException;
+
+    /**
+     * Serialization method.
+     * 
+     * @param instance
+     *            - the instance to serialize.
+     * @param out
+     *            - Stream to write data to.
+     */
+    public void serialize(T instance, DataOutput out) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
new file mode 100644
index 0000000..3251944
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePairComparator {
+
+    public int compare(IFrameTupleAccessor outerRef, int outerIndex, IFrameTupleAccessor innerRef, int innerIndex)
+            throws HyracksDataException;
+
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
new file mode 100644
index 0000000..297b56c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface ITuplePairComparatorFactory extends Serializable {
+    public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputer.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputer.java
new file mode 100644
index 0000000..baf9d5c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputer.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePartitionComputer {
+    public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
new file mode 100644
index 0000000..eb2bbe8
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITuplePartitionComputerFactory extends Serializable {
+    public ITuplePartitionComputer createPartitioner();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFamily.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFamily.java
new file mode 100644
index 0000000..1839dc1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePartitionComputerFamily.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITuplePartitionComputerFamily extends Serializable {
+    public ITuplePartitionComputer createPartitioner(int seed);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java
new file mode 100644
index 0000000..4a6f826
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITypeTraits.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITypeTraits extends Serializable {
+    public boolean isFixedLength();
+
+    public int getFixedLength();
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
new file mode 100644
index 0000000..b04fe86
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/RecordDescriptor.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+@SuppressWarnings("unchecked")
+public final class RecordDescriptor implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ISerializerDeserializer[] fields;
+    private final ITypeTraits[] typeTraits;
+
+    // leaving this constructor for backwards-compatibility
+    public RecordDescriptor(ISerializerDeserializer[] fields) {
+        this.fields = fields;
+        this.typeTraits = null;
+    }
+
+    // temporarily adding constructor to include type traits
+    public RecordDescriptor(ISerializerDeserializer[] fields, ITypeTraits[] typeTraits) {
+        this.fields = fields;
+        this.typeTraits = typeTraits;
+    }
+
+    public int getFieldCount() {
+        return fields.length;
+    }
+
+    public ISerializerDeserializer[] getFields() {
+        return fields;
+    }
+
+    public ITypeTraits[] getTypeTraits() {
+        return typeTraits;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
new file mode 100644
index 0000000..c6e6cd3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksDataException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.exceptions;
+
+public class HyracksDataException extends HyracksException {
+    private static final long serialVersionUID = 1L;
+
+    public HyracksDataException() {
+    }
+
+    public HyracksDataException(String message) {
+        super(message);
+    }
+
+    public HyracksDataException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public HyracksDataException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
new file mode 100644
index 0000000..43913ce
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/exceptions/HyracksException.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.hyracks.api.exceptions;
+
+import java.io.IOException;
+
+public class HyracksException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public HyracksException() {
+    }
+
+    public HyracksException(String message) {
+        super(message);
+    }
+
+    public HyracksException(Throwable cause) {
+        super(cause);
+    }
+
+    public HyracksException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java
new file mode 100644
index 0000000..9e27077
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/FileReference.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import java.io.File;
+import java.io.Serializable;
+
+public final class FileReference implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final File file;
+
+    public FileReference(IODeviceHandle dev, String devRelPath) {
+        file = new File(dev.getPath(), devRelPath);
+    }
+
+    public FileReference(File file) {
+        this.file = file;
+    }
+
+    public File getFile() {
+        return file;
+    }
+
+    @Override
+    public String toString() {
+        return file.getAbsolutePath();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof FileReference)) {
+            return false;
+        }
+        return file.equals(((FileReference) o).file);
+    }
+
+    @Override
+    public int hashCode() {
+        return file.hashCode();
+    }
+
+    public void delete() {
+        file.delete();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IFileHandle.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IFileHandle.java
new file mode 100644
index 0000000..10fcef0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IFileHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+/**
+ * IFileHandle is an interface that exists only for Java compilers to perform static typing
+ * when handling file handle objects. Users must not implement this interface.
+ */
+public interface IFileHandle {
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOFuture.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOFuture.java
new file mode 100644
index 0000000..45b919f
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOFuture.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIOFuture {
+    public int synchronize() throws HyracksDataException, InterruptedException;
+
+    public boolean isComplete();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
new file mode 100644
index 0000000..2e7feb5
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IIOManager.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIOManager {
+    public enum FileReadWriteMode {
+        READ_ONLY,
+        READ_WRITE
+    }
+
+    public enum FileSyncMode {
+        METADATA_SYNC_DATA_SYNC,
+        METADATA_ASYNC_DATA_SYNC,
+        METADATA_ASYNC_DATA_ASYNC
+    }
+
+    public List<IODeviceHandle> getIODevices();
+
+    public IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode)
+            throws HyracksDataException;
+
+    public int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+    public int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException;
+
+    public IIOFuture asyncWrite(IFileHandle fHandle, long offset, ByteBuffer data);
+
+    public IIOFuture asyncRead(IFileHandle fHandle, long offset, ByteBuffer data);
+
+    public void close(IFileHandle fHandle) throws HyracksDataException;
+
+    public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IODeviceHandle.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IODeviceHandle.java
new file mode 100644
index 0000000..16ff19c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IODeviceHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import java.io.File;
+import java.io.Serializable;
+
+public class IODeviceHandle implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final File path;
+
+    private final String workAreaPath;
+
+    public IODeviceHandle(File path, String workAreaPath) {
+        this.path = path;
+        workAreaPath = workAreaPath.trim();
+        if (workAreaPath.endsWith(File.separator)) {
+            workAreaPath = workAreaPath.substring(0, workAreaPath.length() - 1);
+        }
+        this.workAreaPath = workAreaPath;
+    }
+
+    public File getPath() {
+        return path;
+    }
+
+    public String getWorkAreaPath() {
+        return workAreaPath;
+    }
+
+    public FileReference createFileReference(String relPath) {
+        return new FileReference(this, relPath);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWorkspaceFileFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWorkspaceFileFactory.java
new file mode 100644
index 0000000..7b9a7c4
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/io/IWorkspaceFileFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.io;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface IWorkspaceFileFactory {
+    public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException;
+
+    public FileReference createManagedWorkspaceFile(String prefix) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
new file mode 100644
index 0000000..6698ff7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public final class ActivityCluster implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ActivityClusterGraph acg;
+
+    private final ActivityClusterId id;
+
+    private final List<IActivity> roots;
+
+    private final Map<ActivityId, IActivity> activities;
+
+    private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectors;
+
+    private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap;
+
+    private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap;
+
+    private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap;
+
+    private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
+
+    private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
+
+    private final List<ActivityCluster> dependencies;
+
+    private IConnectorPolicyAssignmentPolicy cpap;
+
+    public ActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) {
+        this.acg = acg;
+        this.id = id;
+        roots = new ArrayList<IActivity>();
+        activities = new HashMap<ActivityId, IActivity>();
+        connectors = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+        connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
+        activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+        activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+        connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+        blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+        dependencies = new ArrayList<ActivityCluster>();
+    }
+
+    public ActivityClusterGraph getActivityClusterGraph() {
+        return acg;
+    }
+
+    public ActivityClusterId getId() {
+        return id;
+    }
+
+    public void addRoot(IActivity activity) {
+        roots.add(activity);
+    }
+
+    public void addActivity(IActivity activity) {
+        activities.put(activity.getActivityId(), activity);
+    }
+
+    public void addConnector(IConnectorDescriptor connector) {
+        connectors.put(connector.getConnectorId(), connector);
+    }
+
+    public void connect(IConnectorDescriptor connector, IActivity producerActivity, int producerPort,
+            IActivity consumerActivity, int consumerPort, RecordDescriptor recordDescriptor) {
+        if (!activities.containsKey(producerActivity.getActivityId())
+                || !activities.containsKey(consumerActivity.getActivityId())) {
+            throw new IllegalStateException("Connected Activities belong to different Activity Clusters: "
+                    + producerActivity.getActivityId() + " and " + consumerActivity.getActivityId());
+        }
+        insertIntoIndexedMap(activityInputMap, consumerActivity.getActivityId(), consumerPort, connector);
+        insertIntoIndexedMap(activityOutputMap, producerActivity.getActivityId(), producerPort, connector);
+        connectorActivityMap.put(
+                connector.getConnectorId(),
+                Pair.<Pair<IActivity, Integer>, Pair<IActivity, Integer>> of(
+                        Pair.<IActivity, Integer> of(producerActivity, producerPort),
+                        Pair.<IActivity, Integer> of(consumerActivity, consumerPort)));
+        connectorRecordDescriptorMap.put(connector.getConnectorId(), recordDescriptor);
+    }
+
+    public List<IActivity> getRoots() {
+        return roots;
+    }
+
+    public Map<ActivityId, IActivity> getActivityMap() {
+        return activities;
+    }
+
+    public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+        return connectors;
+    }
+
+    public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() {
+        return connectorRecordDescriptorMap;
+    }
+
+    public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() {
+        return connectorActivityMap;
+    }
+
+    public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() {
+        return activityInputMap;
+    }
+
+    public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() {
+        return activityOutputMap;
+    }
+
+    public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
+        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+        return connEdge.getRight().getLeft().getActivityId();
+    }
+
+    public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
+        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+        return connEdge.getLeft().getLeft().getActivityId();
+    }
+
+    public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
+        return blocked2blockerMap;
+    }
+
+    public List<ActivityCluster> getDependencies() {
+        return dependencies;
+    }
+
+    public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+        return cpap;
+    }
+
+    public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy cpap) {
+        this.cpap = cpap;
+    }
+
+    private <T> void extend(List<T> list, int index) {
+        int n = list.size();
+        for (int i = n; i <= index; ++i) {
+            list.add(null);
+        }
+    }
+
+    private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+        List<V> vList = map.get(key);
+        if (vList == null) {
+            vList = new ArrayList<V>();
+            map.put(key, vList);
+        }
+        extend(vList, index);
+        vList.set(index, value);
+    }
+
+    public JSONObject toJSON() throws JSONException {
+        JSONObject jac = new JSONObject();
+
+        JSONArray jans = new JSONArray();
+        for (IActivity an : activities.values()) {
+            JSONObject jan = new JSONObject();
+            jan.put("id", an.getActivityId().toString());
+            jan.put("java-class", an.getClass().getName());
+
+            List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId());
+            if (inputs != null) {
+                JSONArray jInputs = new JSONArray();
+                for (int i = 0; i < inputs.size(); ++i) {
+                    JSONObject jInput = new JSONObject();
+                    jInput.put("input-port", i);
+                    jInput.put("connector-id", inputs.get(i).getConnectorId().toString());
+                    jInputs.put(jInput);
+                }
+                jan.put("inputs", jInputs);
+            }
+
+            List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId());
+            if (outputs != null) {
+                JSONArray jOutputs = new JSONArray();
+                for (int i = 0; i < outputs.size(); ++i) {
+                    JSONObject jOutput = new JSONObject();
+                    jOutput.put("output-port", i);
+                    jOutput.put("connector-id", outputs.get(i).getConnectorId().toString());
+                    jOutputs.put(jOutput);
+                }
+                jan.put("outputs", jOutputs);
+            }
+
+            Set<ActivityId> blockers = getBlocked2BlockerMap().get(an.getActivityId());
+            if (blockers != null) {
+                JSONArray jDeps = new JSONArray();
+                for (ActivityId blocker : blockers) {
+                    jDeps.put(blocker.toString());
+                }
+                jan.put("depends-on", jDeps);
+            }
+            jans.put(jan);
+        }
+        jac.put("activities", jans);
+
+        return jac;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
new file mode 100644
index 0000000..32c93cd
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -0,0 +1,149 @@
+/*
+2 * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+
+public class ActivityClusterGraph implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int version;
+
+    private final Map<ActivityClusterId, ActivityCluster> activityClusterMap;
+
+    private final Map<ActivityId, ActivityCluster> activityMap;
+
+    private final Map<ConnectorDescriptorId, ActivityCluster> connectorMap;
+
+    private int frameSize;
+
+    private int maxReattempts;
+
+    private IJobletEventListenerFactory jobletEventListenerFactory;
+
+    private IGlobalJobDataFactory globalJobDataFactory;
+
+    private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+
+    private boolean useConnectorPolicyForScheduling;
+
+    public ActivityClusterGraph() {
+        version = 0;
+        activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
+        activityMap = new HashMap<ActivityId, ActivityCluster>();
+        connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
+        frameSize = 32768;
+    }
+
+    public Map<ActivityId, ActivityCluster> getActivityMap() {
+        return activityMap;
+    }
+
+    public Map<ConnectorDescriptorId, ActivityCluster> getConnectorMap() {
+        return connectorMap;
+    }
+
+    public Map<ActivityClusterId, ActivityCluster> getActivityClusterMap() {
+        return activityClusterMap;
+    }
+
+    public void addActivityClusters(Collection<ActivityCluster> newActivityClusters) {
+        for (ActivityCluster ac : newActivityClusters) {
+            activityClusterMap.put(ac.getId(), ac);
+            for (ActivityId aid : ac.getActivityMap().keySet()) {
+                activityMap.put(aid, ac);
+            }
+            for (ConnectorDescriptorId cid : ac.getConnectorMap().keySet()) {
+                connectorMap.put(cid, ac);
+            }
+        }
+        ++version;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setFrameSize(int frameSize) {
+        this.frameSize = frameSize;
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
+    public void setMaxReattempts(int maxReattempts) {
+        this.maxReattempts = maxReattempts;
+    }
+
+    public int getMaxReattempts() {
+        return maxReattempts;
+    }
+
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return jobletEventListenerFactory;
+    }
+
+    public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+        this.jobletEventListenerFactory = jobletEventListenerFactory;
+    }
+
+    public IGlobalJobDataFactory getGlobalJobDataFactory() {
+        return globalJobDataFactory;
+    }
+
+    public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
+        this.globalJobDataFactory = globalJobDataFactory;
+    }
+
+    public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+        return connectorPolicyAssignmentPolicy;
+    }
+
+    public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+        this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+    }
+
+    public boolean isUseConnectorPolicyForScheduling() {
+        return useConnectorPolicyForScheduling;
+    }
+
+    public void setUseConnectorPolicyForScheduling(boolean useConnectorPolicyForScheduling) {
+        this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
+    }
+
+    public JSONObject toJSON() throws JSONException {
+        JSONObject acgj = new JSONObject();
+
+        JSONArray acl = new JSONArray();
+        for (ActivityCluster ac : activityClusterMap.values()) {
+            acl.put(ac.toJSON());
+        }
+        acgj.put("version", version);
+        acgj.put("activity-clusters", acl);
+        return acgj;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
new file mode 100644
index 0000000..cba63959
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterId.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+public final class ActivityClusterId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final JobId jobId;
+
+    private final int id;
+
+    public ActivityClusterId(JobId jobId, int id) {
+        this.jobId = jobId;
+        this.id = id;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + id;
+        result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        ActivityClusterId other = (ActivityClusterId) obj;
+        if (id != other.id) {
+            return false;
+        }
+        if (jobId == null) {
+            if (other.jobId != null) {
+                return false;
+            }
+        } else if (!jobId.equals(other.jobId)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "ACID:" + jobId + ":" + id;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
new file mode 100644
index 0000000..b837066
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGenerator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+
+public interface IActivityClusterGraphGenerator {
+    public Set<Constraint> getConstraints();
+
+    public ActivityClusterGraph initialize();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
new file mode 100644
index 0000000..d801dd1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IActivityClusterGraphGeneratorFactory extends Serializable {
+    public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(String appName, JobId jobId,
+            ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IConnectorDescriptorRegistry.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IConnectorDescriptorRegistry.java
new file mode 100644
index 0000000..65b27e3
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IConnectorDescriptorRegistry.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+
+public interface IConnectorDescriptorRegistry {
+    public ConnectorDescriptorId createConnectorDescriptor(IConnectorDescriptor conn);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IGlobalJobDataFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IGlobalJobDataFactory.java
new file mode 100644
index 0000000..bee495a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IGlobalJobDataFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+
+public interface IGlobalJobDataFactory extends Serializable {
+    public Object createGlobalJobData(IHyracksJobletContext ctx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
new file mode 100644
index 0000000..2f0f025
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobLifecycleListener.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+
+public interface IJobLifecycleListener {
+    public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException;
+
+    public void notifyJobStart(JobId jobId) throws HyracksException;
+
+    public void notifyJobFinish(JobId jobId) throws HyracksException;
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
new file mode 100644
index 0000000..c83e333
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListener.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public interface IJobletEventListener {
+    public void jobletStart();
+
+    public void jobletFinish(JobStatus status);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
new file mode 100644
index 0000000..3b3cacb
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IJobletEventListenerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
+
+public interface IJobletEventListenerFactory extends Serializable {
+    public IJobletEventListener createListener(IHyracksJobletContext ctx);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorDescriptorRegistry.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorDescriptorRegistry.java
new file mode 100644
index 0000000..4ea84d6
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorDescriptorRegistry.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public interface IOperatorDescriptorRegistry {
+    public OperatorDescriptorId createOperatorDescriptorId(IOperatorDescriptor op);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
new file mode 100644
index 0000000..eaa1ec2
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IOperatorEnvironment.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+
+public interface IOperatorEnvironment {
+    public void setStateObject(IStateObject taskState);
+
+    public IStateObject getStateObject(Object id);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
new file mode 100644
index 0000000..5b62b5c
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class JobActivityGraph implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final Map<ActivityId, IActivity> activityMap;
+
+    private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectorMap;
+
+    private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap;
+
+    private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap;
+
+    private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap;
+
+    private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
+
+    private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
+
+    public JobActivityGraph() {
+        activityMap = new HashMap<ActivityId, IActivity>();
+        connectorMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+        connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
+        activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+        activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+        connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+        blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+    }
+
+    public Map<ActivityId, IActivity> getActivityMap() {
+        return activityMap;
+    }
+
+    public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+        return connectorMap;
+    }
+
+    public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() {
+        return connectorRecordDescriptorMap;
+    }
+
+    public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
+        return blocked2blockerMap;
+    }
+
+    public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() {
+        return activityInputMap;
+    }
+
+    public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() {
+        return activityOutputMap;
+    }
+
+    public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() {
+        return connectorActivityMap;
+    }
+
+    public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
+        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+        return connEdge.getRight().getLeft().getActivityId();
+    }
+
+    public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
+        Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+        return connEdge.getLeft().getLeft().getActivityId();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append("ActivityNodes: " + activityMap);
+        buffer.append('\n');
+        buffer.append("Blocker->Blocked: " + blocked2blockerMap);
+        buffer.append('\n');
+        return buffer.toString();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
new file mode 100644
index 0000000..2b9787d
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobFlag.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public enum JobFlag {
+    PROFILE_RUNTIME
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
new file mode 100644
index 0000000..ffb9bba
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobId.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+
+public final class JobId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final long id;
+
+    public JobId(long id) {
+        this.id = id;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) id;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof JobId)) {
+            return false;
+        }
+        return ((JobId) o).id == id;
+    }
+
+    @Override
+    public String toString() {
+        return "JID:" + id;
+    }
+
+    public static JobId parse(String str) {
+        if (str.startsWith("JID:")) {
+            str = str.substring(4);
+            return new JobId(Long.parseLong(str));
+        }
+        throw new IllegalArgumentException();
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
new file mode 100644
index 0000000..7c523f1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
+    private static final long serialVersionUID = 1L;
+
+    private final List<OperatorDescriptorId> roots;
+
+    private final Map<OperatorDescriptorId, IOperatorDescriptor> opMap;
+
+    private final Map<ConnectorDescriptorId, IConnectorDescriptor> connMap;
+
+    private final Map<OperatorDescriptorId, List<IConnectorDescriptor>> opInputMap;
+
+    private final Map<OperatorDescriptorId, List<IConnectorDescriptor>> opOutputMap;
+
+    private final Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> connectorOpMap;
+
+    private final Map<String, Serializable> properties;
+
+    private final Set<Constraint> userConstraints;
+
+    private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+
+    private int frameSize;
+
+    private int maxReattempts;
+
+    private IJobletEventListenerFactory jobletEventListenerFactory;
+
+    private IGlobalJobDataFactory globalJobDataFactory;
+
+    private boolean useConnectorPolicyForScheduling;
+
+    private transient int operatorIdCounter;
+
+    private transient int connectorIdCounter;
+
+    public JobSpecification() {
+        roots = new ArrayList<OperatorDescriptorId>();
+        opMap = new HashMap<OperatorDescriptorId, IOperatorDescriptor>();
+        connMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+        opInputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
+        opOutputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
+        connectorOpMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>>();
+        properties = new HashMap<String, Serializable>();
+        userConstraints = new HashSet<Constraint>();
+        operatorIdCounter = 0;
+        connectorIdCounter = 0;
+        frameSize = 32768;
+        maxReattempts = 2;
+        useConnectorPolicyForScheduling = true;
+    }
+
+    @Override
+    public OperatorDescriptorId createOperatorDescriptorId(IOperatorDescriptor op) {
+        OperatorDescriptorId odId = new OperatorDescriptorId(operatorIdCounter++);
+        opMap.put(odId, op);
+        return odId;
+    }
+
+    @Override
+    public ConnectorDescriptorId createConnectorDescriptor(IConnectorDescriptor conn) {
+        ConnectorDescriptorId cdId = new ConnectorDescriptorId(connectorIdCounter++);
+        connMap.put(cdId, conn);
+        return cdId;
+    }
+
+    public void addRoot(IOperatorDescriptor op) {
+        roots.add(op.getOperatorId());
+    }
+
+    public void connect(IConnectorDescriptor conn, IOperatorDescriptor producerOp, int producerPort,
+            IOperatorDescriptor consumerOp, int consumerPort) {
+        insertIntoIndexedMap(opInputMap, consumerOp.getOperatorId(), consumerPort, conn);
+        insertIntoIndexedMap(opOutputMap, producerOp.getOperatorId(), producerPort, conn);
+        connectorOpMap.put(
+                conn.getConnectorId(),
+                Pair.<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> of(
+                        Pair.<IOperatorDescriptor, Integer> of(producerOp, producerPort),
+                        Pair.<IOperatorDescriptor, Integer> of(consumerOp, consumerPort)));
+    }
+
+    public void setProperty(String name, Serializable value) {
+        properties.put(name, value);
+    }
+
+    public Serializable getProperty(String name) {
+        return properties.get(name);
+    }
+
+    private <T> void extend(List<T> list, int index) {
+        int n = list.size();
+        for (int i = n; i <= index; ++i) {
+            list.add(null);
+        }
+    }
+
+    public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+        return connMap;
+    }
+
+    public Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> getConnectorOperatorMap() {
+        return connectorOpMap;
+    }
+
+    public RecordDescriptor getConnectorRecordDescriptor(IConnectorDescriptor conn) {
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+                .getConnectorId());
+        return connInfo.getLeft().getLeft().getOutputRecordDescriptors()[connInfo.getLeft().getRight()];
+    }
+
+    public IOperatorDescriptor getConsumer(IConnectorDescriptor conn) {
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+                .getConnectorId());
+        return connInfo.getRight().getLeft();
+    }
+
+    public int getConsumerInputIndex(IConnectorDescriptor conn) {
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+                .getConnectorId());
+        return connInfo.getRight().getRight();
+    }
+
+    public IConnectorDescriptor getInputConnectorDescriptor(IOperatorDescriptor op, int inputIndex) {
+        return getInputConnectorDescriptor(op.getOperatorId(), inputIndex);
+    }
+
+    public IConnectorDescriptor getInputConnectorDescriptor(OperatorDescriptorId odId, int inputIndex) {
+        return opInputMap.get(odId).get(inputIndex);
+    }
+
+    public Map<OperatorDescriptorId, List<IConnectorDescriptor>> getOperatorInputMap() {
+        return opInputMap;
+    }
+
+    public RecordDescriptor getOperatorInputRecordDescriptor(OperatorDescriptorId odId, int inputIndex) {
+        return getConnectorRecordDescriptor(getInputConnectorDescriptor(odId, inputIndex));
+    }
+
+    public Map<OperatorDescriptorId, IOperatorDescriptor> getOperatorMap() {
+        return opMap;
+    }
+
+    public Map<OperatorDescriptorId, List<IConnectorDescriptor>> getOperatorOutputMap() {
+        return opOutputMap;
+    }
+
+    public RecordDescriptor getOperatorOutputRecordDescriptor(OperatorDescriptorId odId, int outputIndex) {
+        return getConnectorRecordDescriptor(getOutputConnectorDescriptor(odId, outputIndex));
+    }
+
+    public IConnectorDescriptor getOutputConnectorDescriptor(IOperatorDescriptor op, int outputIndex) {
+        return getOutputConnectorDescriptor(op.getOperatorId(), outputIndex);
+    }
+
+    public IConnectorDescriptor getOutputConnectorDescriptor(OperatorDescriptorId odId, int outputIndex) {
+        return opOutputMap.get(odId).get(outputIndex);
+    }
+
+    public IOperatorDescriptor getProducer(IConnectorDescriptor conn) {
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+                .getConnectorId());
+        return connInfo.getLeft().getLeft();
+    }
+
+    public int getProducerOutputIndex(IConnectorDescriptor conn) {
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
+                .getConnectorId());
+        return connInfo.getLeft().getRight();
+    }
+
+    public List<OperatorDescriptorId> getRoots() {
+        return roots;
+    }
+
+    public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+        return connectorPolicyAssignmentPolicy;
+    }
+
+    public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+        this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+    }
+
+    public void setFrameSize(int frameSize) {
+        this.frameSize = frameSize;
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
+    public void setMaxReattempts(int maxReattempts) {
+        this.maxReattempts = maxReattempts;
+    }
+
+    public int getMaxReattempts() {
+        return maxReattempts;
+    }
+
+    public void addUserConstraint(Constraint constraint) {
+        userConstraints.add(constraint);
+    }
+
+    public Set<Constraint> getUserConstraints() {
+        return userConstraints;
+    }
+
+    public IJobletEventListenerFactory getJobletEventListenerFactory() {
+        return jobletEventListenerFactory;
+    }
+
+    public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+        this.jobletEventListenerFactory = jobletEventListenerFactory;
+    }
+
+    public IGlobalJobDataFactory getGlobalJobDataFactory() {
+        return globalJobDataFactory;
+    }
+
+    public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
+        this.globalJobDataFactory = globalJobDataFactory;
+    }
+
+    public boolean isUseConnectorPolicyForScheduling() {
+        return useConnectorPolicyForScheduling;
+    }
+
+    public void setUseConnectorPolicyForScheduling(boolean useConnectorPolicyForScheduling) {
+        this.useConnectorPolicyForScheduling = useConnectorPolicyForScheduling;
+    }
+
+    private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+        List<V> vList = map.get(key);
+        if (vList == null) {
+            vList = new ArrayList<V>();
+            map.put(key, vList);
+        }
+        extend(vList, index);
+        vList.set(index, value);
+    }
+
+    public String toString() {
+        StringBuilder buffer = new StringBuilder();
+
+        for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : opMap.entrySet()) {
+            buffer.append(e.getKey().getId()).append(" : ").append(e.getValue().toString()).append("\n");
+            List<IConnectorDescriptor> inputs = opInputMap.get(e.getKey());
+            if (inputs != null && !inputs.isEmpty()) {
+                buffer.append("   Inputs:\n");
+                for (IConnectorDescriptor c : inputs) {
+                    buffer.append("      ").append(c.getConnectorId().getId()).append(" : ").append(c.toString())
+                            .append("\n");
+                }
+            }
+            List<IConnectorDescriptor> outputs = opOutputMap.get(e.getKey());
+            if (outputs != null && !outputs.isEmpty()) {
+                buffer.append("   Outputs:\n");
+                for (IConnectorDescriptor c : outputs) {
+                    buffer.append("      ").append(c.getConnectorId().getId()).append(" : ").append(c.toString())
+                            .append("\n");
+                }
+            }
+        }
+
+        buffer.append("\n").append("Constraints:\n").append(userConstraints);
+
+        return buffer.toString();
+    }
+
+    public JSONObject toJSON() throws JSONException {
+        JSONObject jjob = new JSONObject();
+
+        JSONArray jopArray = new JSONArray();
+        for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : opMap.entrySet()) {
+            jopArray.put(e.getValue().toJSON());
+        }
+        jjob.put("operators", jopArray);
+
+        JSONArray jcArray = new JSONArray();
+        for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : connMap.entrySet()) {
+            JSONObject conn = new JSONObject();
+            Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connection = connectorOpMap
+                    .get(e.getKey());
+            if (connection != null) {
+                conn.put("in-operator-id", connection.getLeft().getLeft().getOperatorId().toString());
+                conn.put("in-operator-port", connection.getLeft().getRight().intValue());
+                conn.put("out-operator-id", connection.getRight().getLeft().getOperatorId().toString());
+                conn.put("out-operator-port", connection.getRight().getRight().intValue());
+            }
+            conn.put("connector", e.getValue().toJSON());
+            jcArray.put(conn);
+        }
+        jjob.put("connectors", jcArray);
+
+        return jjob;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobStatus.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobStatus.java
new file mode 100644
index 0000000..aa6fb68
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobStatus.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job;
+
+public enum JobStatus {
+    INITIALIZED,
+    RUNNING,
+    TERMINATED,
+    FAILURE,
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
new file mode 100644
index 0000000..1ca0ac7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounter.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job.profiling.counters;
+
+public interface ICounter {
+    /**
+     * Get the fully-qualified name of the counter.
+     * 
+     * @return Name of the counter.
+     */
+    public String getName();
+
+    /**
+     * Update the value of the counter to be current + delta.
+     * 
+     * @param delta
+     *            - Amount to change the counter value by.
+     * @return the new value after update.
+     */
+    public long update(long delta);
+
+    /**
+     * Set the value of the counter.
+     * 
+     * @param value
+     *            - New value of the counter.
+     * @return Old value of the counter.
+     */
+    public long set(long value);
+
+    /**
+     * Get the value of the counter.
+     * 
+     * @return the value of the counter.
+     */
+    public long get();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java
new file mode 100644
index 0000000..4be02d7
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/profiling/counters/ICounterContext.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.job.profiling.counters;
+
+/**
+ * A namespace that holds named counters.
+ * 
+ * @author vinayakb
+ */
+public interface ICounterContext {
+    /**
+     * Get a counter with the specified name.
+     * 
+     * @param name
+     *            - Name of the counter to get.
+     * @param create
+     *            - Create if the counter does not exist.
+     * @return An existing counter with the given name (if one exists). If a counter with the
+     *         said name does not exist, a new one is created if create is set to <code>true</code>, or
+     *         else returns <code>null</code>.
+     */
+    public ICounter getCounter(String name, boolean create);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
new file mode 100644
index 0000000..96ad7fe
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessage.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.messages;
+
+import java.io.Serializable;
+
+/**
+ * @author rico
+ * 
+ */
+public interface IMessage extends Serializable {
+
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
new file mode 100644
index 0000000..35ae813
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/messages/IMessageBroker.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.messages;
+
+/**
+ * @author rico
+ * 
+ */
+public interface IMessageBroker {
+
+    public void receivedMessage(IMessage message, String nodeId);
+
+}
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
new file mode 100644
index 0000000..c3dfad0
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.partitions;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.resources.IDeallocatable;
+
+public interface IPartition extends IDeallocatable {
+    public IHyracksTaskContext getTaskContext();
+
+    public void writeTo(IFrameWriter writer);
+
+    public boolean isReusable();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
new file mode 100644
index 0000000..be25d12
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/PartitionId.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.partitions;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobId;
+
+public final class PartitionId implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final JobId jobId;
+
+    private final ConnectorDescriptorId cdId;
+
+    private final int senderIndex;
+
+    private final int receiverIndex;
+
+    public PartitionId(JobId jobId, ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
+        this.jobId = jobId;
+        this.cdId = cdId;
+        this.senderIndex = senderIndex;
+        this.receiverIndex = receiverIndex;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+
+    public ConnectorDescriptorId getConnectorDescriptorId() {
+        return cdId;
+    }
+
+    public int getSenderIndex() {
+        return senderIndex;
+    }
+
+    public int getReceiverIndex() {
+        return receiverIndex;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((cdId == null) ? 0 : cdId.hashCode());
+        result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
+        result = prime * result + receiverIndex;
+        result = prime * result + senderIndex;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        PartitionId other = (PartitionId) obj;
+        if (cdId == null) {
+            if (other.cdId != null)
+                return false;
+        } else if (!cdId.equals(other.cdId))
+            return false;
+        if (jobId == null) {
+            if (other.jobId != null)
+                return false;
+        } else if (!jobId.equals(other.jobId))
+            return false;
+        if (receiverIndex != other.receiverIndex)
+            return false;
+        if (senderIndex != other.senderIndex)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return jobId.toString() + ":" + cdId + ":" + senderIndex + ":" + receiverIndex;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatable.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatable.java
new file mode 100644
index 0000000..e5b3ed8
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatable.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface IDeallocatable {
+    public void deallocate();
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatableRegistry.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatableRegistry.java
new file mode 100644
index 0000000..e130607
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/resources/IDeallocatableRegistry.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface IDeallocatableRegistry {
+    public void registerDeallocatable(IDeallocatable deallocatable);
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java
new file mode 100644
index 0000000..6f566d1
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/ClusterTopology.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class ClusterTopology implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final NetworkSwitch clusterSwitch;
+
+    public ClusterTopology(NetworkSwitch clusterSwitch) {
+        this.clusterSwitch = clusterSwitch;
+    }
+
+    public NetworkSwitch getClusterSwitch() {
+        return clusterSwitch;
+    }
+
+    public boolean lookupNetworkTerminal(String terminalName, List<Integer> path) {
+        return clusterSwitch.lookupNetworkTerminal(terminalName, path);
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java
new file mode 100644
index 0000000..0aaee2a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkEndpoint.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class NetworkEndpoint implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public enum EndpointType {
+        NETWORK_SWITCH,
+        NETWORK_TERMINAL,
+    }
+
+    protected final String name;
+
+    protected final Map<String, String> properties;
+
+    public NetworkEndpoint(String name, Map<String, String> properties) {
+        this.name = name;
+        this.properties = properties;
+    }
+
+    public abstract EndpointType getType();
+
+    public final String getName() {
+        return name;
+    }
+
+    public final Map<String, String> getProperties() {
+        return properties;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java
new file mode 100644
index 0000000..3c89044
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkSwitch.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class NetworkSwitch extends NetworkEndpoint {
+    private static final long serialVersionUID = 1L;
+
+    private final Port[] ports;
+
+    private final Map<String, Integer> terminalNamePortIndexMap;
+
+    public NetworkSwitch(String name, Map<String, String> properties, Port[] ports) {
+        super(name, properties);
+        this.ports = ports;
+        terminalNamePortIndexMap = new HashMap<String, Integer>();
+        for (int i = 0; i < ports.length; ++i) {
+            Port port = ports[i];
+            NetworkEndpoint endpoint = port.getEndpoint();
+            Integer portIndex = Integer.valueOf(i);
+            switch (endpoint.getType()) {
+                case NETWORK_SWITCH: {
+                    NetworkSwitch s = (NetworkSwitch) endpoint;
+                    for (String t : s.terminalNamePortIndexMap.keySet()) {
+                        terminalNamePortIndexMap.put(t, portIndex);
+                    }
+                    break;
+                }
+
+                case NETWORK_TERMINAL: {
+                    NetworkTerminal t = (NetworkTerminal) endpoint;
+                    terminalNamePortIndexMap.put(t.getName(), portIndex);
+                    break;
+                }
+            }
+        }
+    }
+
+    public Port[] getPorts() {
+        return ports;
+    }
+
+    @Override
+    public EndpointType getType() {
+        return EndpointType.NETWORK_SWITCH;
+    }
+
+    boolean lookupNetworkTerminal(String terminalName, List<Integer> path) {
+        if (terminalNamePortIndexMap.containsKey(terminalName)) {
+            Integer portIndex = terminalNamePortIndexMap.get(terminalName);
+            path.add(portIndex);
+            NetworkEndpoint endpoint = ports[portIndex.intValue()].getEndpoint();
+            if (endpoint.getType() == EndpointType.NETWORK_SWITCH) {
+                ((NetworkSwitch) endpoint).lookupNetworkTerminal(terminalName, path);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    void getPortList(List<Integer> path, int stepIndex, List<Port> portList) {
+        if (stepIndex >= path.size()) {
+            return;
+        }
+        int portIndex = path.get(stepIndex);
+        Port port = ports[portIndex];
+        portList.add(port);
+        ++stepIndex;
+        if (stepIndex >= path.size()) {
+            return;
+        }
+        NetworkEndpoint endpoint = port.getEndpoint();
+        if (endpoint.getType() != EndpointType.NETWORK_SWITCH) {
+            throw new IllegalArgumentException("Path provided, " + path + ", longer than depth of topology tree");
+        }
+        ((NetworkSwitch) endpoint).getPortList(path, stepIndex, portList);
+    }
+
+    public static class Port implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final NetworkEndpoint endpoint;
+
+        public Port(NetworkEndpoint endpoint) {
+            this.endpoint = endpoint;
+        }
+
+        public NetworkEndpoint getEndpoint() {
+            return endpoint;
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
new file mode 100644
index 0000000..f36da76
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/NetworkTerminal.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.util.Map;
+
+public class NetworkTerminal extends NetworkEndpoint {
+    private static final long serialVersionUID = 1L;
+
+    public NetworkTerminal(String name, Map<String, String> properties) {
+        super(name, properties);
+    }
+
+    @Override
+    public EndpointType getType() {
+        return EndpointType.NETWORK_TERMINAL;
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java
new file mode 100644
index 0000000..0ca6127
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/topology/TopologyDefinitionParser.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.topology;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import org.xml.sax.Attributes;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+import edu.uci.ics.hyracks.api.topology.NetworkEndpoint.EndpointType;
+
+public class TopologyDefinitionParser {
+    private final Stack<ElementStackEntry> stack;
+
+    private boolean inPropertyElement;
+
+    private TopologyDefinitionParser() {
+        stack = new Stack<ElementStackEntry>();
+        inPropertyElement = false;
+    }
+
+    public static ClusterTopology parse(InputSource in) throws IOException, SAXException {
+        TopologyDefinitionParser parser = new TopologyDefinitionParser();
+        return parser.parseInternal(in);
+    }
+
+    private ClusterTopology parseInternal(InputSource in) throws IOException, SAXException {
+        XMLReader parser;
+        parser = XMLReaderFactory.createXMLReader();
+        SAXContentHandler handler = new SAXContentHandler();
+        parser.setContentHandler(handler);
+        parser.parse(in);
+        if (stack.size() != 1) {
+            throw new IllegalStateException("Malformed topology definition");
+        }
+        ElementStackEntry e = stack.pop();
+        if (e.ports.size() != 1) {
+            throw new IllegalArgumentException("Malformed topology definition");
+        }
+        NetworkEndpoint endpoint = e.ports.get(0).getEndpoint();
+        if (endpoint.getType() != EndpointType.NETWORK_SWITCH) {
+            throw new IllegalArgumentException("Top level content in cluster-topology must be network-switch");
+        }
+        return new ClusterTopology((NetworkSwitch) endpoint);
+    }
+
+    private class SAXContentHandler extends DefaultHandler {
+        @Override
+        public void endElement(String uri, String localName, String qName) throws SAXException {
+            if ("network-switch".equals(localName) || "terminal".equals(localName)) {
+                ElementStackEntry e = stack.pop();
+                NetworkEndpoint endpoint = e.type == EndpointType.NETWORK_SWITCH ? new NetworkSwitch(e.name,
+                        e.properties, e.ports.toArray(new NetworkSwitch.Port[e.ports.size()])) : new NetworkTerminal(
+                        e.name, e.properties);
+                stack.peek().ports.add(new NetworkSwitch.Port(endpoint));
+            } else if ("property".equals(localName)) {
+                if (!inPropertyElement) {
+                    throw new IllegalStateException("Improperly nested property element encountered");
+                }
+                inPropertyElement = false;
+            }
+        }
+
+        @Override
+        public void startElement(String uri, String localName, String qName, Attributes atts) throws SAXException {
+            if ("cluster-topology".equals(localName)) {
+                if (!stack.isEmpty()) {
+                    throw new IllegalStateException("Encountered unexpected " + qName);
+                }
+                stack.push(new ElementStackEntry(null, ""));
+            } else if ("network-switch".equals(localName) || "terminal".equals(localName)) {
+                String name = atts.getValue("", "name");
+                if (name == null) {
+                    throw new IllegalStateException("Encountered " + localName + " element with no name attribute");
+                }
+                EndpointType type = "network-switch".equals(localName) ? EndpointType.NETWORK_SWITCH
+                        : EndpointType.NETWORK_TERMINAL;
+                ElementStackEntry e = new ElementStackEntry(type, name);
+                stack.push(e);
+            } else if ("property".equals(localName)) {
+                if (inPropertyElement) {
+                    throw new IllegalStateException("Improperly nested property element encountered");
+                }
+                String name = atts.getValue("", "name");
+                if (name == null) {
+                    throw new IllegalStateException("Encountered " + localName + " element with no name attribute");
+                }
+                String value = atts.getValue("", "value");
+                if (value == null) {
+                    throw new IllegalStateException("Encountered " + localName + " element with no value attribute");
+                }
+                stack.peek().properties.put(name, value);
+                inPropertyElement = true;
+            }
+        }
+    }
+
+    private static class ElementStackEntry {
+        private final EndpointType type;
+
+        private final String name;
+
+        private final Map<String, String> properties;
+
+        private final List<NetworkSwitch.Port> ports;
+
+        public ElementStackEntry(EndpointType type, String name) {
+            this.type = type;
+            this.name = name;
+            this.properties = new HashMap<String, String>();
+            ports = new ArrayList<NetworkSwitch.Port>();
+        }
+    }
+}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
new file mode 100644
index 0000000..1185b5e
--- /dev/null
+++ b/fullstack/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/util/JavaSerializationUtils.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.Serializable;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Proxy;
+
+public class JavaSerializationUtils {
+    public static byte[] serialize(Serializable jobSpec) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(jobSpec);
+        return baos.toByteArray();
+    }
+
+    public static Object deserialize(byte[] bytes, ClassLoader classLoader) throws IOException, ClassNotFoundException {
+        if (bytes == null) {
+            return null;
+        }
+        ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(classLoader);
+            ObjectInputStream ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes), classLoader);
+            return ois.readObject();
+        } finally {
+            Thread.currentThread().setContextClassLoader(ctxCL);
+        }
+    }
+
+    private static class ClassLoaderObjectInputStream extends ObjectInputStream {
+        private ClassLoader classLoader;
+
+        protected ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException,
+                SecurityException {
+            super(in);
+            this.classLoader = classLoader;
+        }
+
+        @Override
+        protected Class<?> resolveClass(ObjectStreamClass desc) throws ClassNotFoundException {
+            return Class.forName(desc.getName(), false, classLoader);
+        }
+
+        @Override
+        protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
+            ClassLoader nonPublicLoader = null;
+            boolean hasNonPublicInterface = false;
+
+            // define proxy in class loader of non-public interface(s), if any
+            Class[] classObjs = new Class[interfaces.length];
+            for (int i = 0; i < interfaces.length; i++) {
+                Class cl = Class.forName(interfaces[i], false, classLoader);
+                if ((cl.getModifiers() & Modifier.PUBLIC) == 0) {
+                    if (hasNonPublicInterface) {
+                        if (nonPublicLoader != cl.getClassLoader()) {
+                            throw new IllegalAccessError("conflicting non-public interface class loaders");
+                        }
+                    } else {
+                        nonPublicLoader = cl.getClassLoader();
+                        hasNonPublicInterface = true;
+                    }
+                }
+                classObjs[i] = cl;
+            }
+            try {
+                return Proxy.getProxyClass(hasNonPublicInterface ? nonPublicLoader : classLoader, classObjs);
+            } catch (IllegalArgumentException e) {
+                throw new ClassNotFoundException(null, e);
+            }
+        }
+    }
+}
\ No newline at end of file