Merged hyracks-next

git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@25 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/.classpath b/hyracks-core/.classpath
index 370f43c..2fb3f21 100644
--- a/hyracks-core/.classpath
+++ b/hyracks-core/.classpath
@@ -2,6 +2,7 @@
 <classpath>
 	<classpathentry kind="src" path="src/main/java"/>
 	<classpathentry kind="src" path="src/test/java"/>
+	<classpathentry kind="src" path="src/main/resources"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
 	<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-core/pom.xml b/hyracks-core/pom.xml
index 04f81dc..0222956 100644
--- a/hyracks-core/pom.xml
+++ b/hyracks-core/pom.xml
@@ -2,7 +2,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>edu.uci.ics.hyracks</groupId>
   <artifactId>hyracks-core</artifactId>
-  <version>0.0.4</version>
+  <version>0.0.5-SNAPSHOT</version>
   <build>
     <plugins>
       <plugin>
@@ -129,6 +129,13 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+  	<dependency>
+  		<groupId>jol</groupId>
+  		<artifactId>jol</artifactId>
+  		<version>1.0.0</version>
+  		<type>jar</type>
+  		<scope>compile</scope>
+  	</dependency>
   </dependencies>
   <reporting>
     <plugins>
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
index 125c1aa..9e3cbbe 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IConnectionEntry.java
@@ -17,6 +17,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
+import java.util.UUID;
 
 public interface IConnectionEntry {
     ByteBuffer getReadBuffer();
@@ -32,4 +33,16 @@
     void close() throws IOException;
 
     void write(ByteBuffer buffer);
+
+    UUID getJobId();
+
+    UUID getStageId();
+
+    void setJobId(UUID jobId);
+
+    void setStageId(UUID stageId);
+
+    boolean aborted();
+
+    void abort();
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
index 91f921c..7e74aee 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/comm/IDataReceiveListenerFactory.java
@@ -18,4 +18,8 @@
 
 public interface IDataReceiveListenerFactory {
     public IDataReceiveListener getDataReceiveListener(UUID endpointUUID, IConnectionEntry entry, int senderIndex);
+
+    public UUID getJobId();
+
+    public UUID getStageId();
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
index 858bb65..bbd4b4f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
@@ -27,8 +27,8 @@
     }
 
     @Override
-    public ConstraintType getConstraintType() {
-        return ConstraintType.ABSOLUTE;
+    public LocationConstraintType getConstraintType() {
+        return LocationConstraintType.ABSOLUTE;
     }
 
     public String getLocationId() {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
new file mode 100644
index 0000000..edfb1c5
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public class ChoiceLocationConstraint extends LocationConstraint {
+    private static final long serialVersionUID = 1L;
+
+    private LocationConstraint[] choices;
+
+    public ChoiceLocationConstraint(LocationConstraint... choices) {
+        this.choices = choices;
+    }
+
+    public LocationConstraint[] getChoices() {
+        return choices;
+    }
+
+    @Override
+    public LocationConstraintType getConstraintType() {
+        return LocationConstraintType.CHOICE;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java
new file mode 100644
index 0000000..16b7b24
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+import java.util.Arrays;
+
+public class ExplicitPartitionConstraint extends PartitionConstraint {
+    private static final long serialVersionUID = 1L;
+
+    private final LocationConstraint[] locationConstraints;
+
+    public ExplicitPartitionConstraint(LocationConstraint[] locationConstraints) {
+        this.locationConstraints = locationConstraints;
+    }
+
+    public LocationConstraint[] getLocationConstraints() {
+        return locationConstraints;
+    }
+
+    @Override
+    public String toString() {
+        return Arrays.deepToString(locationConstraints);
+    }
+
+    @Override
+    public PartitionConstraintType getPartitionConstraintType() {
+        return PartitionConstraintType.EXPLICIT;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
index e88a1b2..1228b90 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
@@ -19,9 +19,10 @@
 public abstract class LocationConstraint implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    public enum ConstraintType {
+    public enum LocationConstraintType {
         ABSOLUTE,
+        CHOICE
     }
 
-    public abstract ConstraintType getConstraintType();
+    public abstract LocationConstraintType getConstraintType();
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
index c825539..82a6f7b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
@@ -15,22 +15,14 @@
 package edu.uci.ics.hyracks.api.constraints;
 
 import java.io.Serializable;
-import java.util.Arrays;
 
-public class PartitionConstraint implements Serializable {
+public abstract class PartitionConstraint implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final LocationConstraint[] locationConstraints;
+    public abstract PartitionConstraintType getPartitionConstraintType();
 
-    public PartitionConstraint(LocationConstraint[] locationConstraints) {
-        this.locationConstraints = locationConstraints;
-    }
-
-    public LocationConstraint[] getLocationConstraints() {
-        return locationConstraints;
-    }
-
-    public String toString() {
-        return Arrays.deepToString(locationConstraints);
+    public enum PartitionConstraintType {
+        EXPLICIT,
+        COUNT
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java
new file mode 100644
index 0000000..b49d880
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public class PartitionCountConstraint extends PartitionConstraint {
+    private static final long serialVersionUID = 1L;
+
+    private final int count;
+
+    public PartitionCountConstraint(int count) {
+        this.count = count;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    @Override
+    public PartitionConstraintType getPartitionConstraintType() {
+        return PartitionConstraintType.COUNT;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
index af86093..6461a05 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/IClusterController.java
@@ -17,7 +17,6 @@
 import java.rmi.Remote;
 import java.util.EnumSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -27,14 +26,25 @@
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 
 public interface IClusterController extends Remote {
-    public void registerNode(INodeController nodeController) throws Exception;
+    public NodeParameters registerNode(INodeController nodeController) throws Exception;
 
     public void unregisterNode(INodeController nodeController) throws Exception;
 
-    public INodeController lookupNode(String id) throws Exception;
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+        StageletStatistics statistics) throws Exception;
 
-    public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
-            throws Exception;
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
+    public void nodeHeartbeat(String id) throws Exception;
+
+    /*
+     * Client Application Control Methods.
+     */
+    public void createApplication(String appName) throws Exception;
+
+    public void startApplication(String appName) throws Exception;
+
+    public void destroyApplication(String appName) throws Exception;
 
     /*
      * Client Job Control methods.
@@ -48,6 +58,6 @@
     public void start(UUID jobId) throws Exception;
 
     public JobStatistics waitForCompletion(UUID jobId) throws Exception;
-    
-    public Map<String,INodeController> getRegistry() throws Exception;
+
+    public Map<String, INodeController> getRegistry() throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
index c271356..264254e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/INodeController.java
@@ -16,28 +16,33 @@
 
 import java.rmi.Remote;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
 import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobStage;
 import edu.uci.ics.hyracks.config.NCConfig;
 
 public interface INodeController extends Remote {
     public String getId() throws Exception;
-    
+
     public NCConfig getConfiguration() throws Exception;
 
     public NodeCapability getNodeCapability() throws Exception;
 
-    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, JobStage stage)
-            throws Exception;
+    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
+        Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception;
 
-    public void initializeJobletPhase2(UUID jobId, JobPlan plan, JobStage stage,
-            Map<PortInstanceId, Endpoint> globalPortMap) throws Exception;
+    public void initializeJobletPhase2(UUID jobId, JobPlan plan, UUID stageId, Map<ActivityNodeId, Set<Integer>> tasks,
+        Map<OperatorDescriptorId, Set<Integer>> opPartitions, Map<PortInstanceId, Endpoint> globalPortMap)
+        throws Exception;
 
-    public void commitJobletInitialization(UUID jobId, JobPlan plan, JobStage stage) throws Exception;
+    public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception;
+
+    public void abortJoblet(UUID jobId, UUID stageId) throws Exception;
 
     public void cleanUpJob(UUID jobId) throws Exception;
 
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java
new file mode 100644
index 0000000..3856fea
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/controller/NodeParameters.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.controller;
+
+import java.io.Serializable;
+
+public class NodeParameters implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int heartbeatPeriod;
+
+    public int getHeartbeatPeriod() {
+        return heartbeatPeriod;
+    }
+
+    public void setHeartbeatPeriod(int heartbeatPeriod) {
+        this.heartbeatPeriod = heartbeatPeriod;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
index c7f3175..86d8118 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/ConnectorDescriptorId.java
@@ -46,4 +46,9 @@
         ConnectorDescriptorId other = (ConnectorDescriptorId) obj;
         return id.equals(other.id);
     }
+
+    @Override
+    public String toString() {
+        return "CDID:" + id;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
new file mode 100644
index 0000000..c884998
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow;
+
+public enum Direction {
+    INPUT,
+    OUTPUT,
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 600fd41..b3feef8 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -50,11 +50,15 @@
      *            Endpoint writer factory.
      * @param index
      *            ordinal index of the data producer partition.
+     * @param nProducerPartitions
+     *            Number of partitions of the producing operator.
+     * @param nConsumerPartitions
+     *            Number of partitions of the consuming operator.
      * @return data writer.
      * @throws Exception
      */
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException;
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
 
     /**
      * Factory metod to create the receive side reader that reads data from this connector.
@@ -67,11 +71,15 @@
      *            Connection Demultiplexer
      * @param index
      *            ordinal index of the data consumer partition
+     * @param nProducerPartitions
+     *            Number of partitions of the producing operator.
+     * @param nConsumerPartitions
+     *            Number of partitions of the consuming operator.
      * @return data reader
      * @throws HyracksDataException
      */
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException;
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException;
 
     /**
      * Translate this connector descriptor to JSON.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index 17a0fc5..ad51370 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -65,21 +65,6 @@
     public void setPartitionConstraint(PartitionConstraint partitionConstraint);
 
     /**
-     * Returns the final partition locations selected for scheduling. These are decided by Hyracks such that they satisfy the partition constraints.
-     * 
-     * @return array indicating number and node ids of the nodes to schedule the operator runtimes.
-     */
-    public String[] getPartitions();
-
-    /**
-     * Sets the partition locations.
-     * 
-     * @param partitions
-     *            node ids to schedule the operators.
-     */
-    public void setPartitions(String[] partitions);
-
-    /**
      * Gets the output record descriptor
      * 
      * @return Array of RecordDescriptor, one per output.
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
index d8057d9..844266d 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
@@ -17,11 +17,6 @@
 import java.io.Serializable;
 
 public final class PortInstanceId implements Serializable {
-    public enum Direction {
-        INPUT,
-        OUTPUT,
-    }
-
     private static final long serialVersionUID = 1L;
 
     private OperatorDescriptorId odId;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
index 921b239..8784b46 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/job/statistics/JobStatistics.java
@@ -64,8 +64,8 @@
         StringBuilder buffer = new StringBuilder();
 
         buffer.append("{\n");
-        indent(buffer, 1).append("startTime: '").append(df.format(startTime)).append("',\n");
-        indent(buffer, 1).append("endTime: '").append(df.format(endTime)).append("',\n");
+        indent(buffer, 1).append("startTime: '").append(startTime == null ? null : df.format(startTime)).append("',\n");
+        indent(buffer, 1).append("endTime: '").append(endTime == null ? null : df.format(endTime)).append("',\n");
         indent(buffer, 1).append("stages: [\n");
         boolean first = true;
         for (StageStatistics ss : stages) {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java
new file mode 100644
index 0000000..2d76c26
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/IResource.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+import java.io.Serializable;
+
+public interface IResource extends Serializable {
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java
new file mode 100644
index 0000000..3a6f930
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ISpaceSharedResource.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface ISpaceSharedResource {
+
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java
new file mode 100644
index 0000000..1fb0465
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/resources/ITimeSharedResource.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.resources;
+
+public interface ITimeSharedResource {
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
index 4d9c309..ea8ede3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -18,10 +18,10 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import edu.uci.ics.hyracks.api.comm.FrameConstants;
 import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
 import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
 import edu.uci.ics.hyracks.context.HyracksContext;
@@ -41,6 +41,12 @@
 
     private final SelectionKey key;
 
+    private UUID jobId;
+
+    private UUID stageId;
+
+    private boolean aborted;
+
     public ConnectionEntry(HyracksContext ctx, SocketChannel socketChannel, SelectionKey key) {
         this.socketChannel = socketChannel;
         readBuffer = ctx.getResourceManager().allocateFrame();
@@ -55,42 +61,46 @@
     }
 
     public boolean dispatch(SelectionKey key) throws IOException {
-        if (key.isReadable()) {
-            if (LOGGER.isLoggable(Level.FINER)) {
-                LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
-            }
-            int bytesRead = socketChannel.read(readBuffer);
-            if (bytesRead < 0) {
-                recvListener.eos(this);
-                return true;
-            }
-            if (LOGGER.isLoggable(Level.FINER)) {
-                LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
-            }
+        if (aborted) {
             recvListener.dataReceived(this);
-        } else if (key.isWritable()) {
-            synchronized (this) {
-                writeBuffer.flip();
+        } else {
+            if (key.isReadable()) {
                 if (LOGGER.isLoggable(Level.FINER)) {
-                    LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
                 }
-                int bytesWritten = socketChannel.write(writeBuffer);
-                if (bytesWritten < 0) {
+                int bytesRead = socketChannel.read(readBuffer);
+                if (bytesRead < 0) {
+                    recvListener.eos(this);
                     return true;
                 }
                 if (LOGGER.isLoggable(Level.FINER)) {
-                    LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
                 }
-                if (writeBuffer.remaining() <= 0) {
-                    int ops = key.interestOps();
-                    key.interestOps(ops & ~SelectionKey.OP_WRITE);
+                recvListener.dataReceived(this);
+            } else if (key.isWritable()) {
+                synchronized (this) {
+                    writeBuffer.flip();
+                    if (LOGGER.isLoggable(Level.FINER)) {
+                        LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    }
+                    int bytesWritten = socketChannel.write(writeBuffer);
+                    if (bytesWritten < 0) {
+                        return true;
+                    }
+                    if (LOGGER.isLoggable(Level.FINER)) {
+                        LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    }
+                    if (writeBuffer.remaining() <= 0) {
+                        int ops = key.interestOps();
+                        key.interestOps(ops & ~SelectionKey.OP_WRITE);
+                    }
+                    writeBuffer.compact();
+                    notifyAll();
                 }
-                writeBuffer.compact();
-                notifyAll();
+            } else {
+                LOGGER.warning("Spurious event triggered: " + key.readyOps());
+                return true;
             }
-        } else {
-            LOGGER.warning("Spurious event triggered: " + key.readyOps());
-            return true;
         }
         return false;
     }
@@ -135,12 +145,46 @@
     }
 
     @Override
-    public void close() throws IOException {
-        socketChannel.close();
+    public void close() {
+        try {
+            socketChannel.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
     }
 
     @Override
     public SelectionKey getSelectionKey() {
         return key;
     }
+
+    @Override
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public void setJobId(UUID jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public UUID getStageId() {
+        return stageId;
+    }
+
+    @Override
+    public void setStageId(UUID stageId) {
+        this.stageId = stageId;
+    }
+
+    @Override
+    public void abort() {
+        aborted = true;
+    }
+
+    @Override
+    public boolean aborted() {
+        return aborted;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
index bfca6ad..55e2962 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
@@ -26,9 +26,11 @@
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -62,6 +64,8 @@
 
     private final IDataReceiveListener initialDataReceiveListener;
 
+    private final Set<IConnectionEntry> connections;
+
     private volatile boolean stopped;
 
     private ByteBuffer emptyFrame;
@@ -76,7 +80,7 @@
 
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Connection manager listening on " + serverSocket.getInetAddress() + ":"
-                    + serverSocket.getLocalPort());
+                + serverSocket.getLocalPort());
         }
 
         pendingConnectionReceivers = new HashMap<UUID, IDataReceiveListenerFactory>();
@@ -85,6 +89,7 @@
         initialDataReceiveListener = new InitialDataReceiveListener();
         emptyFrame = ctx.getResourceManager().allocateFrame();
         emptyFrame.putInt(FrameHelper.getTupleCountOffset(ctx), 0);
+        connections = new HashSet<IConnectionEntry>();
     }
 
     public synchronized void dumpStats() {
@@ -116,7 +121,7 @@
     public IFrameWriter connect(NetworkAddress address, UUID id, int senderId) throws HyracksDataException {
         try {
             SocketChannel channel = SocketChannel
-                    .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
+                .open(new InetSocketAddress(address.getIpAddress(), address.getPort()));
             byte[] initialFrame = new byte[INITIAL_MESSAGE_LEN];
             ByteBuffer buffer = ByteBuffer.wrap(initialFrame);
             buffer.clear();
@@ -173,6 +178,18 @@
         pendingConnectionReceivers.remove(id);
     }
 
+    public synchronized void abortConnections(UUID jobId, UUID stageId) {
+        List<IConnectionEntry> abortConnections = new ArrayList<IConnectionEntry>();
+        synchronized (this) {
+            for (IConnectionEntry ce : connections) {
+                if (ce.getJobId().equals(jobId) && ce.getStageId().equals(stageId)) {
+                    abortConnections.add(ce);
+                }
+            }
+        }
+        dataListenerThread.addPendingAbortConnections(abortConnections);
+    }
+
     private final class NetworkFrameWriter implements IFrameWriter {
         private SocketChannel channel;
 
@@ -237,7 +254,8 @@
     private final class DataListenerThread extends Thread {
         private Selector selector;
 
-        private List<SocketChannel> pendingSockets;
+        private List<SocketChannel> pendingNewSockets;
+        private List<IConnectionEntry> pendingAbortConnections;
 
         public DataListenerThread() {
             super("Hyracks Data Listener Thread");
@@ -246,12 +264,17 @@
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
-            pendingSockets = new ArrayList<SocketChannel>();
+            pendingNewSockets = new ArrayList<SocketChannel>();
+            pendingAbortConnections = new ArrayList<IConnectionEntry>();
         }
 
         synchronized void addSocketChannel(SocketChannel sc) throws IOException {
-            LOGGER.info("Connection received");
-            pendingSockets.add(sc);
+            pendingNewSockets.add(sc);
+            selector.wakeup();
+        }
+
+        synchronized void addPendingAbortConnections(List<IConnectionEntry> abortConnections) {
+            pendingAbortConnections.addAll(abortConnections);
             selector.wakeup();
         }
 
@@ -264,8 +287,8 @@
                     }
                     int n = selector.select();
                     synchronized (this) {
-                        if (!pendingSockets.isEmpty()) {
-                            for (SocketChannel sc : pendingSockets) {
+                        if (!pendingNewSockets.isEmpty()) {
+                            for (SocketChannel sc : pendingNewSockets) {
                                 sc.configureBlocking(false);
                                 SelectionKey scKey = sc.register(selector, SelectionKey.OP_READ);
                                 ConnectionEntry entry = new ConnectionEntry(ctx, sc, scKey);
@@ -275,7 +298,20 @@
                                     LOGGER.fine("Woke up selector");
                                 }
                             }
-                            pendingSockets.clear();
+                            pendingNewSockets.clear();
+                        }
+                        if (!pendingAbortConnections.isEmpty()) {
+                            for (IConnectionEntry ce : pendingAbortConnections) {
+                                SelectionKey key = ce.getSelectionKey();
+                                ce.abort();
+                                ((ConnectionEntry) ce).dispatch(key);
+                                key.cancel();
+                                ce.close();
+                                synchronized (ConnectionManager.this) {
+                                    connections.remove(ce);
+                                }
+                            }
+                            pendingAbortConnections.clear();
                         }
                         if (LOGGER.isLoggable(Level.FINE)) {
                             LOGGER.fine("Selector: " + n);
@@ -295,11 +331,14 @@
                                 if (close) {
                                     key.cancel();
                                     entry.close();
+                                    synchronized (ConnectionManager.this) {
+                                        connections.remove(entry);
+                                    }
                                 }
                             }
                         }
                     }
-                } catch (IOException e) {
+                } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
@@ -331,6 +370,11 @@
 
                 newListener = connectionReceiver.getDataReceiveListener(endpointID, entry, senderId);
                 entry.setDataReceiveListener(newListener);
+                entry.setJobId(connectionReceiver.getJobId());
+                entry.setStageId(connectionReceiver.getStageId());
+                synchronized (ConnectionManager.this) {
+                    connections.add(entry);
+                }
                 byte[] ack = new byte[4];
                 ByteBuffer ackBuffer = ByteBuffer.wrap(ack);
                 ackBuffer.clear();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
index 3eb2280..694e61a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
@@ -32,7 +32,7 @@
 import edu.uci.ics.hyracks.context.HyracksContext;
 
 public class DemuxDataReceiveListenerFactory implements IDataReceiveListenerFactory, IConnectionDemultiplexer,
-        IDataReceiveListener {
+    IDataReceiveListener {
     private static final Logger LOGGER = Logger.getLogger(DemuxDataReceiveListenerFactory.class.getName());
 
     private final NonDeterministicFrameReader frameReader;
@@ -40,10 +40,14 @@
     private final BitSet readyBits;
     private IConnectionEntry senders[];
     private int openSenderCount;
+    private UUID jobId;
+    private UUID stageId;
 
-    public DemuxDataReceiveListenerFactory(HyracksContext ctx) {
+    public DemuxDataReceiveListenerFactory(HyracksContext ctx, UUID jobId, UUID stageId) {
         frameReader = new NonDeterministicFrameReader(ctx, this);
         this.ctx = ctx;
+        this.jobId = jobId;
+        this.stageId = stageId;
         readyBits = new BitSet();
         senders = null;
         openSenderCount = 0;
@@ -66,13 +70,15 @@
         ByteBuffer buffer = entry.getReadBuffer();
         buffer.flip();
         int dataLen = buffer.remaining();
-        if (dataLen >= ctx.getFrameSize()) {
+        if (dataLen >= ctx.getFrameSize() || entry.aborted()) {
             if (LOGGER.isLoggable(Level.FINEST)) {
                 LOGGER.finest("NonDeterministicDataReceiveListener: frame received: sender = " + senderIndex);
             }
             SelectionKey key = entry.getSelectionKey();
-            int ops = key.interestOps();
-            key.interestOps(ops & ~SelectionKey.OP_READ);
+            if (key.isValid()) {
+                int ops = key.interestOps();
+                key.interestOps(ops & ~SelectionKey.OP_READ);
+            }
             readyBits.set(senderIndex);
             notifyAll();
             return;
@@ -139,4 +145,14 @@
     public synchronized int getSenderCount() {
         return senders.length;
     }
+
+    @Override
+    public UUID getJobId() {
+        return jobId;
+    }
+
+    @Override
+    public UUID getStageId() {
+        return stageId;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
index 3f2d449..91350735 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/NonDeterministicFrameReader.java
@@ -55,6 +55,10 @@
         }
         while (true) {
             IConnectionEntry entry = demux.findNextReadyEntry(lastReadSender);
+            if (entry.aborted()) {
+                eos = true;
+                return false;
+            }
             lastReadSender = (Integer) entry.getAttachment();
             ByteBuffer netBuffer = entry.getReadBuffer();
             int tupleCount = netBuffer.getInt(FrameHelper.getTupleCountOffset(ctx));
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
index 2ae094e..70f0248 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
@@ -131,7 +131,7 @@
                 int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
                         - accessor.getFieldStartOffset(tIndex, fields[i]);
                 System.arraycopy(accessor.getBuffer().array(), fSrcStart, buffer.array(), tupleDataEndOffset
-                        + fSrcSlotsLength + fStartOffset, fLen);
+                        + fTargetSlotsLength + fStartOffset, fLen);
                 fEndOffset += fLen;
                 buffer.putShort(tupleDataEndOffset + i * 2, (short) fEndOffset);
                 fStartOffset = fEndOffset;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
index 950569d..3d3652a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/config/CCConfig.java
@@ -22,4 +22,13 @@
 
     @Option(name = "-http-port", usage = "Sets the http port for the admin console")
     public int httpPort;
+
+    @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)")
+    public int heartbeatPeriod = 10000;
+
+    @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
+    public int maxHeartbeatLapsePeriods = 5;
+
+    @Option(name = "-use-jol", usage = "Forces Hyracks to use the JOL based scheduler (default: false)")
+    public boolean useJOL = false;
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java
deleted file mode 100644
index 0461499..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/ClusterControllerService.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.controller;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.util.EnumSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.eclipse.jetty.server.Handler;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.server.handler.ContextHandler;
-
-import edu.uci.ics.hyracks.api.controller.IClusterController;
-import edu.uci.ics.hyracks.api.controller.INodeController;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.config.CCConfig;
-import edu.uci.ics.hyracks.job.JobManager;
-import edu.uci.ics.hyracks.web.WebServer;
-
-public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
-    private static final long serialVersionUID = 1L;
-
-    private CCConfig ccConfig;
-
-    private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
-
-    private final Map<String, INodeController> nodeRegistry;
-
-    private WebServer webServer;
-
-    private final JobManager jobManager;
-
-    private final Executor taskExecutor;
-
-    public ClusterControllerService(CCConfig ccConfig) throws Exception {
-        this.ccConfig = ccConfig;
-        nodeRegistry = new LinkedHashMap<String, INodeController>();
-        jobManager = new JobManager(this);
-        taskExecutor = Executors.newCachedThreadPool();
-        webServer = new WebServer(new Handler[] { getAdminConsoleHandler() });
-    }
-
-    @Override
-    public void start() throws Exception {
-        LOGGER.log(Level.INFO, "Starting ClusterControllerService");
-        Registry registry = LocateRegistry.createRegistry(ccConfig.port);
-        registry.rebind(IClusterController.class.getName(), this);
-        webServer.setPort(ccConfig.httpPort);
-        webServer.start();
-        LOGGER.log(Level.INFO, "Started ClusterControllerService");
-    }
-
-    @Override
-    public void stop() throws Exception {
-        LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
-        webServer.stop();
-        LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
-    }
-
-    @Override
-    public UUID createJob(JobSpecification jobSpec) throws Exception {
-        return jobManager.createJob(jobSpec, EnumSet.noneOf(JobFlag.class));
-    }
-
-    @Override
-    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        return jobManager.createJob(jobSpec, jobFlags);
-    }
-
-    @Override
-    public void registerNode(INodeController nodeController) throws Exception {
-        String id = nodeController.getId();
-        synchronized (this) {
-            if (nodeRegistry.containsKey(id)) {
-                throw new Exception("Node with this name already registered.");
-            }
-            nodeRegistry.put(id, nodeController);
-        }
-        nodeController.notifyRegistration(this);
-        LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
-    }
-
-    @Override
-    public void unregisterNode(INodeController nodeController) throws Exception {
-        String id = nodeController.getId();
-        synchronized (this) {
-            nodeRegistry.remove(id);
-        }
-        LOGGER.log(Level.INFO, "Unregistered INodeController");
-    }
-
-    @Override
-    public INodeController lookupNode(String id) throws Exception {
-        return nodeRegistry.get(id);
-    }
-
-    public Executor getExecutor() {
-        return taskExecutor;
-    }
-
-    public synchronized void notifyJobComplete(final UUID jobId) {
-        for (final INodeController nc : nodeRegistry.values()) {
-            taskExecutor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        nc.cleanUpJob(jobId);
-                    } catch (Exception e) {
-                    }
-                }
-
-            });
-        }
-    }
-
-    @Override
-    public void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId, StageletStatistics statistics)
-            throws Exception {
-        jobManager.notifyStageletComplete(jobId, stageId, nodeId, statistics);
-    }
-
-    @Override
-    public JobStatus getJobStatus(UUID jobId) throws Exception {
-        return jobManager.getJobStatus(jobId);
-    }
-
-    @Override
-    public void start(UUID jobId) throws Exception {
-        jobManager.start(jobId);
-    }
-
-    @Override
-    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
-        return jobManager.waitForCompletion(jobId);
-    }
-
-    private Handler getAdminConsoleHandler() {
-        ContextHandler handler = new ContextHandler("/admin");
-        handler.setHandler(new AbstractHandler() {
-            @Override
-            public void handle(String target, Request baseRequest, HttpServletRequest request,
-                    HttpServletResponse response) throws IOException, ServletException {
-                if (!"/".equals(target)) {
-                    return;
-                }
-                response.setContentType("text/html;charset=utf-8");
-                response.setStatus(HttpServletResponse.SC_OK);
-                baseRequest.setHandled(true);
-                PrintWriter writer = response.getWriter();
-                writer.println("<html><head><title>Hyracks Admin Console</title></head><body>");
-                writer.println("<h1>Hyracks Admin Console</h1>");
-                writer.println("<h2>Node Controllers</h2>");
-                writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
-                synchronized (ClusterControllerService.this) {
-                    for (Map.Entry<String, INodeController> e : nodeRegistry.entrySet()) {
-                        try {
-                            INodeController n = e.getValue();
-                            writer.print("<tr><td>");
-                            writer.print(e.getKey());
-                            writer.print("</td><td>");
-                            writer.print("</td></tr>");
-                        } catch (Exception ex) {
-                        }
-                    }
-                }
-                writer.println("</table>");
-                writer.println("</body></html>");
-                writer.flush();
-            }
-        });
-        return handler;
-    }
-
-    @Override
-    public Map<String, INodeController> getRegistry() throws Exception {
-        return nodeRegistry;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java
deleted file mode 100644
index dbdf1c9..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/NodeControllerService.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.controller;
-
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.controller.IClusterController;
-import edu.uci.ics.hyracks.api.controller.INodeController;
-import edu.uci.ics.hyracks.api.controller.NodeCapability;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
-import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.comm.ConnectionManager;
-import edu.uci.ics.hyracks.comm.DemuxDataReceiveListenerFactory;
-import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.context.HyracksContext;
-import edu.uci.ics.hyracks.job.Joblet;
-import edu.uci.ics.hyracks.job.Stagelet;
-import edu.uci.ics.hyracks.runtime.OperatorRunnable;
-
-public class NodeControllerService extends AbstractRemoteService implements INodeController {
-    private static final long serialVersionUID = 1L;
-
-    private NCConfig ncConfig;
-
-    private final String id;
-
-    private final HyracksContext ctx;
-
-    private final NodeCapability nodeCapability;
-
-    private final ConnectionManager connectionManager;
-
-    private IClusterController ccs;
-
-    private Map<UUID, Joblet> jobletMap;
-
-    private Executor executor;
-
-    public NodeControllerService(NCConfig ncConfig) throws Exception {
-        this.ncConfig = ncConfig;
-        id = ncConfig.nodeId;
-        this.ctx = new HyracksContext(ncConfig.frameSize);
-        if (id == null) {
-            throw new Exception("id not set");
-        }
-        nodeCapability = computeNodeCapability();
-        connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
-        jobletMap = new HashMap<UUID, Joblet>();
-        executor = Executors.newCachedThreadPool();
-    }
-
-    private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
-
-    @Override
-    public void start() throws Exception {
-        LOGGER.log(Level.INFO, "Starting NodeControllerService");
-        connectionManager.start();
-        Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
-        IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
-        cc.registerNode(this);
-        LOGGER.log(Level.INFO, "Started NodeControllerService");
-    }
-
-    @Override
-    public void stop() throws Exception {
-        LOGGER.log(Level.INFO, "Stopping NodeControllerService");
-        connectionManager.stop();
-        LOGGER.log(Level.INFO, "Stopped NodeControllerService");
-    }
-
-    @Override
-    public String getId() throws Exception {
-        return id;
-    }
-
-    @Override
-    public NodeCapability getNodeCapability() throws Exception {
-        return nodeCapability;
-    }
-
-    public ConnectionManager getConnectionManager() {
-        return connectionManager;
-    }
-
-    private static NodeCapability computeNodeCapability() {
-        NodeCapability nc = new NodeCapability();
-        nc.setCPUCount(Runtime.getRuntime().availableProcessors());
-        return nc;
-    }
-
-    private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
-        String ipaddrStr = ncConfig.dataIPAddress;
-        ipaddrStr = ipaddrStr.trim();
-        Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
-        Matcher m = pattern.matcher(ipaddrStr);
-        if (!m.matches()) {
-            throw new Exception(MessageFormat.format(
-                    "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
-        }
-        byte[] ipBytes = new byte[4];
-        ipBytes[0] = (byte) Integer.parseInt(m.group(1));
-        ipBytes[1] = (byte) Integer.parseInt(m.group(2));
-        ipBytes[2] = (byte) Integer.parseInt(m.group(3));
-        ipBytes[3] = (byte) Integer.parseInt(m.group(4));
-        return InetAddress.getByAddress(ipBytes);
-    }
-
-    @Override
-    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, JobStage stage)
-            throws Exception {
-        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stage.getId()
-                + "]: Initializing Joblet Phase 1");
-
-        final Joblet joblet = getLocalJoblet(jobId);
-
-        Stagelet stagelet = new Stagelet(joblet, stage.getId(), id);
-        joblet.setStagelet(stage.getId(), stagelet);
-
-        final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
-        Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
-
-        List<Endpoint> endpointList = new ArrayList<Endpoint>();
-
-        for (ActivityNodeId hanId : stage.getTasks()) {
-            IActivityNode han = plan.getActivityNodeMap().get(hanId);
-            if (LOGGER.isLoggable(Level.FINEST)) {
-                LOGGER.finest("Initializing " + hanId + " -> " + han);
-            }
-            IOperatorDescriptor op = han.getOwner();
-            List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
-            String[] partitions = op.getPartitions();
-            for (int i = 0; i < partitions.length; ++i) {
-                String part = partitions[i];
-                if (id.equals(part)) {
-                    IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
-                    OperatorRunnable or = new OperatorRunnable(ctx, hon);
-                    stagelet.setOperator(op.getOperatorId(), i, or);
-                    if (inputs != null) {
-                        for (int j = 0; j < inputs.size(); ++j) {
-                            if (j >= 1) {
-                                throw new IllegalStateException();
-                            }
-                            IConnectorDescriptor conn = inputs.get(j);
-                            Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
-                            endpointList.add(endpoint);
-                            DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx);
-                            connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
-                            PortInstanceId piId = new PortInstanceId(op.getOperatorId(),
-                                    PortInstanceId.Direction.INPUT, plan.getTaskInputMap().get(hanId).get(j), i);
-                            if (LOGGER.isLoggable(Level.FINEST)) {
-                                LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
-                            }
-                            portMap.put(piId, endpoint);
-                            IFrameReader reader = createReader(conn, drlf, i, plan, stagelet);
-                            or.setFrameReader(reader);
-                        }
-                    }
-                    honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
-                }
-            }
-        }
-
-        stagelet.setEndpointList(endpointList);
-
-        return portMap;
-    }
-
-    private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
-            final int receiverIndex, JobPlan plan, final Stagelet stagelet) throws HyracksDataException {
-        final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex);
-
-        return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
-            private int frameCount;
-
-            @Override
-            public void open() throws HyracksDataException {
-                frameCount = 0;
-                reader.open();
-            }
-
-            @Override
-            public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                boolean status = reader.nextFrame(buffer);
-                if (status) {
-                    ++frameCount;
-                }
-                return status;
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                reader.close();
-                stagelet.getStatistics().getStatisticsMap().put(
-                        "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
-                        String.valueOf(frameCount));
-            }
-        } : reader;
-    }
-
-    @Override
-    public void initializeJobletPhase2(UUID jobId, final JobPlan plan, JobStage stage,
-            final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
-        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stage.getId()
-                + "]: Initializing Joblet Phase 2");
-        final Joblet ji = getLocalJoblet(jobId);
-        Stagelet si = (Stagelet) ji.getStagelet(stage.getId());
-        final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
-
-        final Stagelet stagelet = (Stagelet) ji.getStagelet(stage.getId());
-
-        final JobSpecification spec = plan.getJobSpecification();
-
-        for (ActivityNodeId hanId : stage.getTasks()) {
-            IActivityNode han = plan.getActivityNodeMap().get(hanId);
-            IOperatorDescriptor op = han.getOwner();
-            List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
-            String[] partitions = op.getPartitions();
-            for (int i = 0; i < partitions.length; ++i) {
-                String part = partitions[i];
-                if (id.equals(part)) {
-                    OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
-                    if (outputs != null) {
-                        for (int j = 0; j < outputs.size(); ++j) {
-                            final IConnectorDescriptor conn = outputs.get(j);
-                            final int senderIndex = i;
-                            IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
-                                @Override
-                                public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
-                                    PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
-                                            PortInstanceId.Direction.INPUT, spec.getConsumerInputIndex(conn), index);
-                                    Endpoint ep = globalPortMap.get(piId);
-                                    if (LOGGER.isLoggable(Level.FINEST)) {
-                                        LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
-                                    }
-                                    return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
-                                            .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
-                                }
-                            };
-                            or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i));
-                        }
-                    }
-                    stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
-                }
-            }
-        }
-    }
-
-    private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
-            final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
-        return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
-            private int frameCount;
-
-            @Override
-            public void open() throws HyracksDataException {
-                frameCount = 0;
-                writer.open();
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                ++frameCount;
-                writer.nextFrame(buffer);
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                writer.close();
-                stagelet.getStatistics().getStatisticsMap().put(
-                        "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
-                        String.valueOf(frameCount));
-            }
-        } : writer;
-    }
-
-    @Override
-    public void commitJobletInitialization(UUID jobId, JobPlan plan, JobStage stage) throws Exception {
-        final Joblet ji = getLocalJoblet(jobId);
-        Stagelet si = (Stagelet) ji.getStagelet(stage.getId());
-        for (Endpoint e : si.getEndpointList()) {
-            connectionManager.unacceptConnection(e.getEndpointId());
-        }
-        si.setEndpointList(null);
-    }
-
-    private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
-        Joblet ji = jobletMap.get(jobId);
-        if (ji == null) {
-            ji = new Joblet(this, jobId);
-            jobletMap.put(jobId, ji);
-        }
-        return ji;
-    }
-
-    public Executor getExecutor() {
-        return executor;
-    }
-
-    @Override
-    public synchronized void cleanUpJob(UUID jobId) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Cleaning up after job: " + jobId);
-        }
-        jobletMap.remove(jobId);
-        connectionManager.dumpStats();
-    }
-
-    @Override
-    public void startStage(UUID jobId, UUID stageId) throws Exception {
-        Joblet ji = jobletMap.get(jobId);
-        if (ji != null) {
-            Stagelet s = ji.getStagelet(stageId);
-            if (s != null) {
-                s.start();
-            }
-        }
-    }
-
-    public void notifyStageComplete(UUID jobId, UUID stageId, StageletStatistics stats) throws Exception {
-        ccs.notifyStageletComplete(jobId, stageId, id, stats);
-    }
-
-    @Override
-    public void notifyRegistration(IClusterController ccs) throws Exception {
-        this.ccs = ccs;
-    }
-
-	@Override
-	public NCConfig getConfiguration() throws Exception {
-		return ncConfig;
-	}
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
new file mode 100644
index 0000000..af60240
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -0,0 +1,561 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.Vector;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import jol.core.Runtime;
+import jol.core.Runtime.DebugLevel;
+
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.ContextHandler;
+
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.controller.IClusterController;
+import edu.uci.ics.hyracks.api.controller.INodeController;
+import edu.uci.ics.hyracks.api.controller.NodeParameters;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.config.CCConfig;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
+import edu.uci.ics.hyracks.web.WebServer;
+
+public class ClusterControllerService extends AbstractRemoteService implements IClusterController {
+    private static final long serialVersionUID = 1L;
+
+    private CCConfig ccConfig;
+
+    private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName());
+
+    private final Map<String, NodeControllerState> nodeRegistry;
+
+    private WebServer webServer;
+
+    private final IJobManager jobManager;
+
+    private final Executor taskExecutor;
+
+    private final Timer timer;
+
+    private Runtime jolRuntime;
+
+    public ClusterControllerService(CCConfig ccConfig) throws Exception {
+        this.ccConfig = ccConfig;
+        nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+        Set<DebugLevel> jolDebugLevel = LOGGER.isLoggable(Level.FINE) ? Runtime.DEBUG_ALL : new HashSet<DebugLevel>();
+        jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
+        jobManager = new JOLJobManagerImpl(this, jolRuntime);
+        taskExecutor = Executors.newCachedThreadPool();
+        webServer = new WebServer(new Handler[] {
+            getAdminConsoleHandler(), getApplicationDataHandler()
+        });
+        this.timer = new Timer(true);
+    }
+
+    @Override
+    public void start() throws Exception {
+        LOGGER.log(Level.INFO, "Starting ClusterControllerService");
+        Registry registry = LocateRegistry.createRegistry(ccConfig.port);
+        registry.rebind(IClusterController.class.getName(), this);
+        webServer.setPort(ccConfig.httpPort);
+        webServer.start();
+        timer.schedule(new DeadNodeSweeper(), 0, ccConfig.heartbeatPeriod);
+        LOGGER.log(Level.INFO, "Started ClusterControllerService");
+    }
+
+    @Override
+    public void stop() throws Exception {
+        LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
+        webServer.stop();
+        LOGGER.log(Level.INFO, "Stopped ClusterControllerService");
+    }
+
+    @Override
+    public UUID createJob(JobSpecification jobSpec) throws Exception {
+        return jobManager.createJob(jobSpec, EnumSet.noneOf(JobFlag.class));
+    }
+
+    @Override
+    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        return jobManager.createJob(jobSpec, jobFlags);
+    }
+
+    @Override
+    public NodeParameters registerNode(INodeController nodeController) throws Exception {
+        String id = nodeController.getId();
+        NodeControllerState state = new NodeControllerState(nodeController);
+        synchronized (this) {
+            if (nodeRegistry.containsKey(id)) {
+                throw new Exception("Node with this name already registered.");
+            }
+            nodeRegistry.put(id, state);
+        }
+        nodeController.notifyRegistration(this);
+        jobManager.registerNode(id);
+        LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
+        NodeParameters params = new NodeParameters();
+        params.setHeartbeatPeriod(ccConfig.heartbeatPeriod);
+        return params;
+    }
+
+    @Override
+    public void unregisterNode(INodeController nodeController) throws Exception {
+        String id = nodeController.getId();
+        synchronized (this) {
+            nodeRegistry.remove(id);
+        }
+        LOGGER.log(Level.INFO, "Unregistered INodeController");
+    }
+
+    public synchronized NodeControllerState lookupNode(String id) throws Exception {
+        return nodeRegistry.get(id);
+    }
+
+    public Executor getExecutor() {
+        return taskExecutor;
+    }
+
+    public synchronized void notifyJobComplete(final UUID jobId) {
+        for (final NodeControllerState ns : nodeRegistry.values()) {
+            taskExecutor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ns.getNodeController().cleanUpJob(jobId);
+                    } catch (Exception e) {
+                    }
+                }
+
+            });
+        }
+    }
+
+    @Override
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+        StageletStatistics statistics) throws Exception {
+        jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
+    }
+
+    @Override
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+        jobManager.notifyStageletFailure(jobId, stageId, attempt, nodeId);
+    }
+
+    @Override
+    public JobStatus getJobStatus(UUID jobId) throws Exception {
+        return jobManager.getJobStatus(jobId);
+    }
+
+    @Override
+    public void start(UUID jobId) throws Exception {
+        jobManager.start(jobId);
+    }
+
+    @Override
+    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+        return jobManager.waitForCompletion(jobId);
+    }
+
+    private Handler getAdminConsoleHandler() {
+        ContextHandler handler = new ContextHandler("/admin");
+        handler.setHandler(new AbstractHandler() {
+            @Override
+            public void handle(String target, Request baseRequest, HttpServletRequest request,
+                HttpServletResponse response) throws IOException, ServletException {
+                if (!"/".equals(target)) {
+                    return;
+                }
+                response.setContentType("text/html;charset=utf-8");
+                response.setStatus(HttpServletResponse.SC_OK);
+                baseRequest.setHandled(true);
+                PrintWriter writer = response.getWriter();
+                writer.println("<html><head><title>Hyracks Admin Console</title></head><body>");
+                writer.println("<h1>Hyracks Admin Console</h1>");
+                writer.println("<h2>Node Controllers</h2>");
+                writer.println("<table><tr><td>Node Id</td><td>Host</td></tr>");
+                synchronized (ClusterControllerService.this) {
+                    for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+                        try {
+                            writer.print("<tr><td>");
+                            writer.print(e.getKey());
+                            writer.print("</td><td>");
+                            writer.print(e.getValue().getLastHeartbeatDuration());
+                            writer.print("</td></tr>");
+                        } catch (Exception ex) {
+                        }
+                    }
+                }
+                writer.println("</table>");
+                writer.println("</body></html>");
+                writer.flush();
+            }
+        });
+        return handler;
+    }
+
+    private Handler getApplicationDataHandler() {
+        ContextHandler handler = new ContextHandler("/applications");
+        handler.setHandler(new AbstractHandler() {
+            @Override
+            public void handle(String target, Request baseRequest, HttpServletRequest request,
+                HttpServletResponse response) throws IOException, ServletException {
+            }
+        });
+        return handler;
+    }
+
+    @Override
+    public Map<String, INodeController> getRegistry() throws Exception {
+        Map<String, INodeController> map = new HashMap<String, INodeController>();
+        for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+            map.put(e.getKey(), e.getValue().getNodeController());
+        }
+        return map;
+    }
+
+    @Override
+    public synchronized void nodeHeartbeat(String id) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Heartbeat from: " + id);
+        }
+        NodeControllerState ns = nodeRegistry.get(id);
+        if (ns != null) {
+            ns.notifyHeartbeat();
+        }
+    }
+
+    private void killNode(String nodeId) throws Exception {
+        nodeRegistry.remove(nodeId);
+        jobManager.notifyNodeFailure(nodeId);
+    }
+
+    private class DeadNodeSweeper extends TimerTask {
+        @Override
+        public void run() {
+            Set<String> deadNodes = new HashSet<String>();
+            synchronized (ClusterControllerService.this) {
+                for (Map.Entry<String, NodeControllerState> e : nodeRegistry.entrySet()) {
+                    NodeControllerState state = e.getValue();
+                    if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+                        deadNodes.add(e.getKey());
+                    }
+                }
+                for (String deadNode : deadNodes) {
+                    try {
+                        if (LOGGER.isLoggable(Level.INFO)) {
+                            LOGGER.info("Killing node: " + deadNode);
+                        }
+                        killNode(deadNode);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+    }
+
+    interface RemoteOp<T> {
+        public String getNodeId();
+
+        public T execute(INodeController node) throws Exception;
+    }
+
+    interface Accumulator<T, R> {
+        public void accumulate(T o);
+
+        public R getResult();
+    }
+
+    <T, R> R runRemote(final RemoteOp<T>[] remoteOps, final Accumulator<T, R> accumulator) throws Exception {
+        final Semaphore installComplete = new Semaphore(remoteOps.length);
+        final List<Exception> errors = new Vector<Exception>();
+        for (final RemoteOp<T> remoteOp : remoteOps) {
+            NodeControllerState nodeState = lookupNode(remoteOp.getNodeId());
+            final INodeController node = nodeState.getNodeController();
+
+            installComplete.acquire();
+            Runnable remoteRunner = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        T t = remoteOp.execute(node);
+                        if (accumulator != null) {
+                            synchronized (accumulator) {
+                                accumulator.accumulate(t);
+                            }
+                        }
+                    } catch (Exception e) {
+                        errors.add(e);
+                    } finally {
+                        installComplete.release();
+                    }
+                }
+            };
+
+            getExecutor().execute(remoteRunner);
+        }
+        installComplete.acquire(remoteOps.length);
+        if (!errors.isEmpty()) {
+            throw errors.get(0);
+        }
+        return accumulator == null ? null : accumulator.getResult();
+    }
+
+    static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
+        private String nodeId;
+        private UUID jobId;
+        private JobPlan plan;
+        private UUID stageId;
+        private int attempt;
+        private Map<ActivityNodeId, Set<Integer>> tasks;
+        private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
+
+        public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
+            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.plan = plan;
+            this.stageId = stageId;
+            this.attempt = attempt;
+            this.tasks = tasks;
+            this.opPartitions = opPartitions;
+        }
+
+        @Override
+        public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
+            return node.initializeJobletPhase1(jobId, plan, stageId, attempt, tasks, opPartitions);
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Distribution Phase 1";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class Phase2Installer implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private JobPlan plan;
+        private UUID stageId;
+        private Map<ActivityNodeId, Set<Integer>> tasks;
+        private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
+        private Map<PortInstanceId, Endpoint> globalPortMap;
+
+        public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
+            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+            Map<PortInstanceId, Endpoint> globalPortMap) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.plan = plan;
+            this.stageId = stageId;
+            this.tasks = tasks;
+            this.opPartitions = opPartitions;
+            this.globalPortMap = globalPortMap;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.initializeJobletPhase2(jobId, plan, stageId, tasks, opPartitions, globalPortMap);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Distribution Phase 2";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class Phase3Installer implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private UUID stageId;
+
+        public Phase3Installer(String nodeId, UUID jobId, UUID stageId) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.stageId = stageId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.commitJobletInitialization(jobId, stageId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Distribution Phase 3";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class StageStarter implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private UUID stageId;
+
+        public StageStarter(String nodeId, UUID jobId, UUID stageId) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.stageId = stageId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.startStage(jobId, stageId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Started Stage: " + stageId;
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class JobletAborter implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+        private UUID stageId;
+
+        public JobletAborter(String nodeId, UUID jobId, UUID stageId, int attempt) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+            this.stageId = stageId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.abortJoblet(jobId, stageId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Aborting";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class JobCompleteNotifier implements RemoteOp<Void> {
+        private String nodeId;
+        private UUID jobId;
+
+        public JobCompleteNotifier(String nodeId, UUID jobId) {
+            this.nodeId = nodeId;
+            this.jobId = jobId;
+        }
+
+        @Override
+        public Void execute(INodeController node) throws Exception {
+            node.cleanUpJob(jobId);
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return jobId + " Cleaning Up";
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+    }
+
+    static class PortMapMergingAccumulator implements
+        Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
+        Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+
+        @Override
+        public void accumulate(Map<PortInstanceId, Endpoint> o) {
+            portMap.putAll(o);
+        }
+
+        @Override
+        public Map<PortInstanceId, Endpoint> getResult() {
+            return portMap;
+        }
+    }
+
+    @Override
+    public void createApplication(String appName) throws Exception {
+
+    }
+
+    @Override
+    public void destroyApplication(String appName) throws Exception {
+
+    }
+
+    @Override
+    public void startApplication(String appName) throws Exception {
+
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
new file mode 100644
index 0000000..e364423
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/IJobManager.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.EnumSet;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public interface IJobManager {
+    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception;
+
+    public void start(UUID jobId) throws Exception;
+
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+        StageletStatistics statistics) throws Exception;
+
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception;
+
+    public JobStatus getJobStatus(UUID jobId);
+
+    public JobStatistics waitForCompletion(UUID jobId) throws Exception;
+
+    public void notifyNodeFailure(String nodeId) throws Exception;
+
+    public void registerNode(String nodeId) throws Exception;
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
new file mode 100644
index 0000000..9d7862a
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -0,0 +1,1089 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+import jol.core.Runtime;
+import jol.types.basic.BasicTupleSet;
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+import jol.types.exception.BadKeyException;
+import jol.types.exception.UpdateException;
+import jol.types.table.BasicTable;
+import jol.types.table.EventTable;
+import jol.types.table.Function;
+import jol.types.table.Key;
+import jol.types.table.TableName;
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+
+public class JOLJobManagerImpl implements IJobManager {
+    private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
+
+    public static final String JOL_SCOPE = "hyrackscc";
+
+    private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
+
+    private final Runtime jolRuntime;
+
+    private final LinkedBlockingQueue<Runnable> jobQueue;
+
+    private final JobTable jobTable;
+
+    private final JobQueueThread jobQueueThread;
+
+    private final OperatorDescriptorTable odTable;
+
+    private final OperatorLocationTable olTable;
+
+    private final OperatorCloneCountTable ocTable;
+
+    private final ConnectorDescriptorTable cdTable;
+
+    private final ActivityNodeTable anTable;
+
+    private final ActivityConnectionTable acTable;
+
+    private final ActivityBlockedTable abTable;
+
+    private final JobStartTable jobStartTable;
+
+    private final JobCleanUpTable jobCleanUpTable;
+
+    private final JobCleanUpCompleteTable jobCleanUpCompleteTable;
+
+    private final StartMessageTable startMessageTable;
+
+    private final StageletCompleteTable stageletCompleteTable;
+
+    private final StageletFailureTable stageletFailureTable;
+
+    private final AvailableNodesTable availableNodesTable;
+
+    private final RankedAvailableNodesTable rankedAvailableNodesTable;
+
+    private final FailedNodesTable failedNodesTable;
+
+    private final AbortMessageTable abortMessageTable;
+
+    private final AbortNotifyTable abortNotifyTable;
+
+    private final ExpandPartitionCountConstraintTableFunction expandPartitionCountConstraintFunction;
+
+    private final List<String> rankedAvailableNodes;
+
+    public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
+        this.jolRuntime = jolRuntime;
+        jobQueue = new LinkedBlockingQueue<Runnable>();
+        jobQueueThread = new JobQueueThread();
+        jobQueueThread.start();
+
+        this.jobTable = new JobTable(jolRuntime);
+        this.odTable = new OperatorDescriptorTable(jolRuntime);
+        this.olTable = new OperatorLocationTable(jolRuntime);
+        this.ocTable = new OperatorCloneCountTable(jolRuntime);
+        this.cdTable = new ConnectorDescriptorTable(jolRuntime);
+        this.anTable = new ActivityNodeTable(jolRuntime);
+        this.acTable = new ActivityConnectionTable(jolRuntime);
+        this.abTable = new ActivityBlockedTable(jolRuntime);
+        this.jobStartTable = new JobStartTable();
+        this.jobCleanUpTable = new JobCleanUpTable(jolRuntime);
+        this.jobCleanUpCompleteTable = new JobCleanUpCompleteTable();
+        this.startMessageTable = new StartMessageTable(jolRuntime);
+        this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
+        this.stageletFailureTable = new StageletFailureTable(jolRuntime);
+        this.availableNodesTable = new AvailableNodesTable(jolRuntime);
+        this.rankedAvailableNodesTable = new RankedAvailableNodesTable(jolRuntime);
+        this.failedNodesTable = new FailedNodesTable(jolRuntime);
+        this.abortMessageTable = new AbortMessageTable(jolRuntime);
+        this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
+        this.expandPartitionCountConstraintFunction = new ExpandPartitionCountConstraintTableFunction();
+        this.rankedAvailableNodes = new ArrayList<String>();
+
+        jolRuntime.catalog().register(jobTable);
+        jolRuntime.catalog().register(odTable);
+        jolRuntime.catalog().register(olTable);
+        jolRuntime.catalog().register(ocTable);
+        jolRuntime.catalog().register(cdTable);
+        jolRuntime.catalog().register(anTable);
+        jolRuntime.catalog().register(acTable);
+        jolRuntime.catalog().register(abTable);
+        jolRuntime.catalog().register(jobStartTable);
+        jolRuntime.catalog().register(jobCleanUpTable);
+        jolRuntime.catalog().register(jobCleanUpCompleteTable);
+        jolRuntime.catalog().register(startMessageTable);
+        jolRuntime.catalog().register(stageletCompleteTable);
+        jolRuntime.catalog().register(stageletFailureTable);
+        jolRuntime.catalog().register(availableNodesTable);
+        jolRuntime.catalog().register(rankedAvailableNodesTable);
+        jolRuntime.catalog().register(failedNodesTable);
+        jolRuntime.catalog().register(abortMessageTable);
+        jolRuntime.catalog().register(abortNotifyTable);
+        jolRuntime.catalog().register(expandPartitionCountConstraintFunction);
+
+        jobTable.register(new JobTable.Callback() {
+            @Override
+            public void deletion(TupleSet arg0) {
+                jobTable.notifyAll();
+            }
+
+            @Override
+            public void insertion(TupleSet arg0) {
+                jobTable.notifyAll();
+            }
+        });
+
+        startMessageTable.register(new StartMessageTable.Callback() {
+            @Override
+            public void deletion(TupleSet tuples) {
+
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void insertion(TupleSet tuples) {
+                for (final Tuple t : tuples) {
+                    jobQueue.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Object[] data = t.toArray();
+                                UUID jobId = (UUID) data[0];
+                                UUID stageId = (UUID) data[1];
+                                Integer attempt = (Integer) data[2];
+                                JobPlan plan = (JobPlan) data[3];
+                                Set<List> ts = (Set<List>) data[4];
+                                Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                    for (List l : activityInfoSet) {
+                                        Object[] lData = l.toArray();
+                                        ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                        Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+                                        if (opParts == null) {
+                                            opParts = new HashSet<Integer>();
+                                            opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+                                        }
+                                        opParts.add((Integer) lData[1]);
+                                    }
+                                }
+                                ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+                                    .size()];
+                                int i = 0;
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                                    Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                    for (List l : activityInfoSet) {
+                                        Object[] lData = l.toArray();
+                                        ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                        Set<Integer> aParts = tasks.get(aid);
+                                        if (aParts == null) {
+                                            aParts = new HashSet<Integer>();
+                                            tasks.put(aid, aParts);
+                                        }
+                                        aParts.add((Integer) lData[1]);
+                                    }
+                                    p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
+                                        plan, stageId, attempt, tasks, opPartitions);
+                                }
+                                LOGGER.info("Stage start - Phase 1");
+                                Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+                                    new ClusterControllerService.PortMapMergingAccumulator());
+
+                                ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+                                    .size()];
+                                ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+                                    .size()];
+                                ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+                                    .size()];
+                                i = 0;
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                                    Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                    for (List l : activityInfoSet) {
+                                        Object[] lData = l.toArray();
+                                        ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                        Set<Integer> aParts = tasks.get(aid);
+                                        if (aParts == null) {
+                                            aParts = new HashSet<Integer>();
+                                            tasks.put(aid, aParts);
+                                        }
+                                        aParts.add((Integer) lData[1]);
+                                    }
+                                    p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
+                                        plan, stageId, tasks, opPartitions, globalPortMap);
+                                    p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+                                        stageId);
+                                    ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
+                                        stageId);
+                                    ++i;
+                                }
+                                LOGGER.info("Stage start - Phase 2");
+                                ccs.runRemote(p2is, null);
+                                LOGGER.info("Stage start - Phase 3");
+                                ccs.runRemote(p3is, null);
+                                LOGGER.info("Stage start");
+                                ccs.runRemote(ss, null);
+                                LOGGER.info("Stage started");
+                            } catch (Exception e) {
+                            }
+                        }
+                    });
+                }
+            }
+        });
+
+        jobCleanUpTable.register(new JobCleanUpTable.Callback() {
+            @Override
+            public void deletion(TupleSet tuples) {
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void insertion(TupleSet tuples) {
+                for (final Tuple t : tuples) {
+                    jobQueue.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Object[] data = t.toArray();
+                                UUID jobId = (UUID) data[0];
+                                Set<String> ts = (Set<String>) data[1];
+                                ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+                                    .size()];
+                                int i = 0;
+                                for (String n : ts) {
+                                    jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+                                }
+                                try {
+                                    ccs.runRemote(jcns, null);
+                                } finally {
+                                    BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable
+                                        .createTuple(jobId));
+                                    jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+                                    jolRuntime.evaluate();
+                                }
+                            } catch (Exception e) {
+                            }
+                        }
+                    });
+                }
+            }
+        });
+
+        abortMessageTable.register(new AbortMessageTable.Callback() {
+            @Override
+            public void deletion(TupleSet tuples) {
+
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void insertion(TupleSet tuples) {
+                for (final Tuple t : tuples) {
+                    jobQueue.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Object[] data = t.toArray();
+                                UUID jobId = (UUID) data[0];
+                                UUID stageId = (UUID) data[1];
+                                Integer attempt = (Integer) data[2];
+                                Set<List> ts = (Set<List>) data[4];
+                                ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+                                    .size()];
+                                int i = 0;
+                                BasicTupleSet notificationTuples = new BasicTupleSet();
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    String nodeId = (String) t2Data[0];
+                                    jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId,
+                                        attempt);
+                                    notificationTuples.add(AbortNotifyTable
+                                        .createTuple(jobId, stageId, nodeId, attempt));
+                                }
+                                try {
+                                    ccs.runRemote(jas, null);
+                                } finally {
+                                    jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples,
+                                        null);
+                                    jolRuntime.evaluate();
+                                }
+                            } catch (Exception e) {
+                            }
+                        }
+                    });
+                }
+            }
+        });
+
+        jolRuntime.install(JOL_SCOPE, ClassLoader.getSystemResource(SCHEDULER_OLG_FILE));
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        final UUID jobId = UUID.randomUUID();
+
+        final JobPlanBuilder builder = new JobPlanBuilder();
+        builder.init(jobSpec, jobFlags);
+
+        final BasicTupleSet anTuples = new BasicTupleSet();
+        final BasicTupleSet acTuples = new BasicTupleSet();
+        final BasicTupleSet abTuples = new BasicTupleSet();
+        IActivityGraphBuilder gBuilder = new IActivityGraphBuilder() {
+            @Override
+            public void addTask(IActivityNode task) {
+                anTuples.add(ActivityNodeTable.createTuple(jobId, task));
+                builder.addTask(task);
+            }
+
+            @Override
+            public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+                acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
+                    taskOutputIndex));
+                builder.addTargetEdge(operatorOutputIndex, task, taskOutputIndex);
+            }
+
+            @Override
+            public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+                acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
+                    taskInputIndex));
+                builder.addSourceEdge(operatorInputIndex, task, taskInputIndex);
+            }
+
+            @Override
+            public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+                abTuples.add(ActivityBlockedTable.createTuple(jobId, blocker, blocked));
+                builder.addBlockingEdge(blocker, blocked);
+            }
+        };
+
+        BasicTupleSet odTuples = new BasicTupleSet();
+        BasicTupleSet olTuples = new BasicTupleSet();
+        BasicTupleSet ocTuples = new BasicTupleSet();
+        for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
+            IOperatorDescriptor od = e.getValue();
+            int nPartitions = addPartitionConstraintTuples(jobId, od, olTuples, ocTuples);
+            odTuples.add(OperatorDescriptorTable.createTuple(jobId, nPartitions, od));
+            od.contributeTaskGraph(gBuilder);
+        }
+
+        BasicTupleSet cdTuples = new BasicTupleSet();
+        for (Map.Entry<ConnectorDescriptorId, IConnectorDescriptor> e : jobSpec.getConnectorMap().entrySet()) {
+            cdTuples.add(ConnectorDescriptorTable.createTuple(jobId, jobSpec, e.getValue()));
+        }
+
+        BasicTupleSet jobTuples = new BasicTupleSet(JobTable.createInitialJobTuple(jobId, jobSpec, builder.getPlan()));
+
+        jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, OperatorLocationTable.TABLE_NAME, olTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, OperatorCloneCountTable.TABLE_NAME, ocTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, ActivityBlockedTable.TABLE_NAME, abTuples, null);
+
+        jolRuntime.evaluate();
+
+        return jobId;
+    }
+
+    private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
+        BasicTupleSet ocTuples) {
+        PartitionConstraint pc = od.getPartitionConstraint();
+
+        switch (pc.getPartitionConstraintType()) {
+            case COUNT:
+                int count = ((PartitionCountConstraint) pc).getCount();
+                ocTuples.add(OperatorCloneCountTable.createTuple(jobId, od.getOperatorId(), count));
+                return count;
+
+            case EXPLICIT:
+                LocationConstraint[] locationConstraints = ((ExplicitPartitionConstraint) pc).getLocationConstraints();
+                for (int i = 0; i < locationConstraints.length; ++i) {
+                    addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i], 0);
+                }
+                return locationConstraints.length;
+        }
+        throw new IllegalArgumentException();
+    }
+
+    private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
+        LocationConstraint locationConstraint, int benefit) {
+        switch (locationConstraint.getConstraintType()) {
+            case ABSOLUTE:
+                String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
+                olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i, benefit));
+                break;
+
+            case CHOICE:
+                int index = 0;
+                for (LocationConstraint lc : ((ChoiceLocationConstraint) locationConstraint).getChoices()) {
+                    addLocationConstraintTuple(olTuples, jobId, opId, i, lc, benefit - index);
+                    index++;
+                }
+        }
+    }
+
+    @Override
+    public JobStatus getJobStatus(UUID jobId) {
+        synchronized (jobTable) {
+            try {
+                Tuple jobTuple = jobTable.lookupJob(jobId);
+                if (jobTuple == null) {
+                    return null;
+                }
+                return (JobStatus) jobTuple.value(1);
+            } catch (BadKeyException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void notifyNodeFailure(String nodeId) throws Exception {
+        int len = rankedAvailableNodes.size();
+        int delIndex = -1;
+        for (int i = 0; i < len; ++i) {
+            if (nodeId.equals(rankedAvailableNodes.get(i))) {
+                delIndex = i;
+                break;
+            }
+        }
+        if (delIndex < 0) {
+            return;
+        }
+        BasicTupleSet delRANTuples = new BasicTupleSet();
+        delRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, delIndex));
+
+        BasicTupleSet insRANTuples = new BasicTupleSet();
+        for (int i = delIndex + 1; i < len; ++i) {
+            insRANTuples.add(RankedAvailableNodesTable.createTuple(rankedAvailableNodes.get(i), i - 1));
+        }
+
+        rankedAvailableNodes.remove(delIndex);
+
+        jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, delRANTuples);
+
+        BasicTupleSet unavailableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, null, unavailableTuples);
+
+        jolRuntime.evaluate();
+
+        BasicTupleSet failedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, failedTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+        StageletStatistics statistics) throws Exception {
+        BasicTupleSet scTuples = new BasicTupleSet();
+        scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
+
+        jolRuntime.schedule(JOL_SCOPE, StageletCompleteTable.TABLE_NAME, scTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+        BasicTupleSet sfTuples = new BasicTupleSet();
+        sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
+
+        jolRuntime.schedule(JOL_SCOPE, StageletFailureTable.TABLE_NAME, sfTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public void start(UUID jobId) throws Exception {
+        BasicTupleSet jsTuples = new BasicTupleSet();
+        jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
+
+        jolRuntime.schedule(JOL_SCOPE, JobStartTable.TABLE_NAME, jsTuples, null);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public synchronized void registerNode(String nodeId) throws Exception {
+        rankedAvailableNodes.add(nodeId);
+        BasicTupleSet insRANTuples = new BasicTupleSet();
+        insRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, rankedAvailableNodes.size() - 1));
+
+        jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, null);
+
+        BasicTupleSet availableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
+
+        jolRuntime.evaluate();
+
+        BasicTupleSet unfailedTuples = new BasicTupleSet(FailedNodesTable.createTuple(nodeId));
+
+        jolRuntime.schedule(JOL_SCOPE, FailedNodesTable.TABLE_NAME, null, unfailedTuples);
+
+        jolRuntime.evaluate();
+    }
+
+    @Override
+    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
+        synchronized (jobTable) {
+            Tuple jobTuple = null;
+            while ((jobTuple = jobTable.lookupJob(jobId)) != null && jobTuple.value(1) != JobStatus.TERMINATED) {
+                jobTable.wait();
+            }
+            return jobTuple == null ? null : jobTable.buildJobStatistics(jobTuple);
+        }
+    }
+
+    /*
+     * declare(job, keys(0), {JobId, Status, JobSpec, JobPlan})
+     */
+    private static class JobTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "job");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class, Set.class
+        };
+
+        public JobTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        @SuppressWarnings("unchecked")
+        static Tuple createInitialJobTuple(UUID jobId, JobSpecification jobSpec, JobPlan plan) {
+            return new Tuple(jobId, JobStatus.INITIALIZED, jobSpec, plan, new HashSet());
+        }
+
+        @SuppressWarnings("unchecked")
+        JobStatistics buildJobStatistics(Tuple jobTuple) {
+            Set<Set<StageletStatistics>> statsSet = (Set<Set<StageletStatistics>>) jobTuple.value(4);
+            JobStatistics stats = new JobStatistics();
+            if (statsSet != null) {
+                for (Set<StageletStatistics> stageStatsSet : statsSet) {
+                    StageStatistics stageStats = new StageStatistics();
+                    for (StageletStatistics stageletStats : stageStatsSet) {
+                        stageStats.addStageletStatistics(stageletStats);
+                    }
+                    stats.addStageStatistics(stageStats);
+                }
+            }
+            return stats;
+        }
+
+        Tuple lookupJob(UUID jobId) throws BadKeyException {
+            TupleSet set = primary().lookupByKey(jobId);
+            if (set.isEmpty()) {
+                return null;
+            }
+            return (Tuple) set.toArray()[0];
+        }
+    }
+
+    /*
+     * declare(operatordescriptor, keys(0, 1), {JobId, ODId, OperatorDescriptor})
+     */
+    private static class OperatorDescriptorTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatordescriptor");
+
+        private static Key PRIMARY_KEY = new Key(0, 1);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, Integer.class, IOperatorDescriptor.class
+        };
+
+        public OperatorDescriptorTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, int nPartitions, IOperatorDescriptor od) {
+            return new Tuple(jobId, od.getOperatorId(), nPartitions, od);
+        }
+    }
+
+    /*
+     * declare(operatorlocation, keys(0, 1), {JobId, ODId, NodeId})
+     */
+    private static class OperatorLocationTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorlocation");
+
+        private static Key PRIMARY_KEY = new Key();
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, String.class, Integer.class, Integer.class
+        };
+
+        public OperatorLocationTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition, int benefit) {
+            return new Tuple(jobId, opId, nodeId, partition, benefit);
+        }
+    }
+
+    /*
+     * declare(operatorclonecount, keys(0, 1), {JobId, ODId, Count})
+     */
+    private static class OperatorCloneCountTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorclonecount");
+
+        private static Key PRIMARY_KEY = new Key();
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, Integer.class
+        };
+
+        public OperatorCloneCountTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, int cloneCount) {
+            return new Tuple(jobId, opId, cloneCount);
+        }
+    }
+
+    /*
+     * declare(connectordescriptor, keys(0, 1), {JobId, CDId, SrcODId, SrcPort, DestODId, DestPort, ConnectorDescriptor})
+     */
+    private static class ConnectorDescriptorTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "connectordescriptor");
+
+        private static Key PRIMARY_KEY = new Key(0, 1);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class,
+            ConnectorDescriptorId.class,
+            OperatorDescriptorId.class,
+            Integer.class,
+            OperatorDescriptorId.class,
+            Integer.class,
+            IConnectorDescriptor.class
+        };
+
+        public ConnectorDescriptorTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, JobSpecification jobSpec, IConnectorDescriptor conn) {
+            IOperatorDescriptor srcOD = jobSpec.getProducer(conn);
+            int srcPort = jobSpec.getProducerOutputIndex(conn);
+            IOperatorDescriptor destOD = jobSpec.getConsumer(conn);
+            int destPort = jobSpec.getConsumerInputIndex(conn);
+            Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort, destOD
+                .getOperatorId(), destPort, conn);
+            return cdTuple;
+        }
+    }
+
+    /*
+     * declare(activitynode, keys(0, 1, 2), {JobId, OperatorId, ActivityId, ActivityNode})
+     */
+    private static class ActivityNodeTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activitynode");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, IActivityNode.class
+        };
+
+        public ActivityNodeTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IActivityNode aNode) {
+            return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), aNode.getActivityNodeId(),
+                aNode);
+        }
+    }
+
+    /*
+     * declare(activityconnection, keys(0, 1, 2, 3), {JobId, OperatorId, Integer, Direction, ActivityNodeId, Integer})
+     */
+    private static class ActivityConnectionTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityconnection");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
+        };
+
+        public ActivityConnectionTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IActivityNode aNode, Direction direction, int odPort, int activityPort) {
+            return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction, aNode
+                .getActivityNodeId(), activityPort);
+        }
+    }
+
+    /*
+     * declare(activityblocked, keys(0, 1, 2, 3), {JobId, OperatorId, BlockerActivityId, BlockedActivityId})
+     */
+    private static class ActivityBlockedTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "activityblocked");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class,
+            OperatorDescriptorId.class,
+            ActivityNodeId.class,
+            OperatorDescriptorId.class,
+            ActivityNodeId.class
+        };
+
+        public ActivityBlockedTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, IActivityNode blocker, IActivityNode blocked) {
+            ActivityNodeId blockerANId = blocker.getActivityNodeId();
+            OperatorDescriptorId blockerODId = blockerANId.getOperatorDescriptorId();
+            ActivityNodeId blockedANId = blocked.getActivityNodeId();
+            OperatorDescriptorId blockedODId = blockedANId.getOperatorDescriptorId();
+            return new Tuple(jobId, blockerODId, blockerANId, blockedODId, blockedANId);
+        }
+    }
+
+    /*
+     * declare(jobstart, keys(0), {JobId, SubmitTime})
+     */
+    private static class JobStartTable extends EventTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, Long.class
+        };
+
+        public JobStartTable() {
+            super(TABLE_NAME, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, long submitTime) {
+            return new Tuple(jobId, submitTime);
+        }
+    }
+
+    /*
+     * declare(startmessage, keys(0, 1), {JobId, StageId, JobPlan, TupleSet})
+     */
+    private static class StartMessageTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "startmessage");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
+        };
+
+        public StartMessageTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+    }
+
+    /*
+     * declare(jobcleanup, keys(0), {JobId, Set<NodeId>})
+     */
+    private static class JobCleanUpTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanup");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, Set.class
+        };
+
+        public JobCleanUpTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+    }
+
+    /*
+     * declare(jobcleanupcomplete, keys(0), {JobId})
+     */
+    private static class JobCleanUpCompleteTable extends EventTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanupcomplete");
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class
+        };
+
+        public JobCleanUpCompleteTable() {
+            super(TABLE_NAME, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId) {
+            return new Tuple(jobId);
+        }
+    }
+
+    /*
+     * declare(stageletcomplete, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+     */
+    private static class StageletCompleteTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletcomplete");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, String.class, Integer.class, StageletStatistics.class
+        };
+
+        public StageletCompleteTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
+            StageletStatistics statistics) {
+            return new Tuple(jobId, stageId, nodeId, attempt, statistics);
+        }
+    }
+
+    /*
+     * declare(stageletfailure, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt})
+     */
+    private static class StageletFailureTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "stageletfailure");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, String.class, Integer.class
+        };
+
+        public StageletFailureTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+            return new Tuple(jobId, stageId, nodeId, attempt);
+        }
+    }
+
+    /*
+     * declare(availablenodes, keys(0), {NodeId})
+     */
+    private static class AvailableNodesTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "availablenodes");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            String.class
+        };
+
+        public AvailableNodesTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(String nodeId) {
+            return new Tuple(nodeId);
+        }
+    }
+
+    /*
+     * declare(rankedavailablenodes, keys(0), {NodeId, Integer})
+     */
+    private static class RankedAvailableNodesTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "rankedavailablenodes");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            String.class, Integer.class
+        };
+
+        public RankedAvailableNodesTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(String nodeId, int rank) {
+            return new Tuple(nodeId, rank);
+        }
+    }
+
+    /*
+     * declare(failednodes, keys(0), {NodeId})
+     */
+    private static class FailedNodesTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "failednodes");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            String.class
+        };
+
+        public FailedNodesTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(String nodeId) {
+            return new Tuple(nodeId);
+        }
+    }
+
+    /*
+     * declare(abortmessage, keys(0, 1), {JobId, StageId, Attempt, JobPlan, TupleSet})
+     */
+    private static class AbortMessageTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortmessage");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
+        };
+
+        public AbortMessageTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+    }
+
+    /*
+     * declare(abortnotify, keys(0, 1, 2, 3), {JobId, StageId, NodeId, Attempt, StageletStatistics})
+     */
+    private static class AbortNotifyTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "abortnotify");
+
+        private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, UUID.class, String.class, Integer.class
+        };
+
+        public AbortNotifyTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt) {
+            return new Tuple(jobId, stageId, nodeId, attempt);
+        }
+    }
+
+    private static class ExpandPartitionCountConstraintTableFunction extends Function {
+        private static final String TABLE_NAME = "expandpartitioncountconstraint";
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, Integer.class, Integer.class
+        };
+
+        public ExpandPartitionCountConstraintTableFunction() {
+            super(TABLE_NAME, SCHEMA);
+        }
+
+        @Override
+        public TupleSet insert(TupleSet tuples, TupleSet conflicts) throws UpdateException {
+            TupleSet result = new BasicTupleSet();
+            int counter = 0;
+            for (Tuple t : tuples) {
+                int nPartitions = (Integer) t.value(2);
+                for (int i = 0; i < nPartitions; ++i) {
+                    result.add(new Tuple(t.value(0), t.value(1), i, counter++));
+                }
+            }
+            return result;
+        }
+    }
+
+    private class JobQueueThread extends Thread {
+        public JobQueueThread() {
+            setDaemon(true);
+        }
+
+        public void run() {
+            Runnable r;
+            while (true) {
+                try {
+                    r = jobQueue.take();
+                } catch (InterruptedException e) {
+                    continue;
+                }
+                try {
+                    r.run();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
new file mode 100644
index 0000000..d748a13
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanBuilder.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobPlanBuilder implements IActivityGraphBuilder {
+    private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
+
+    private JobPlan plan;
+
+    @Override
+    public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
+        addToValueSet(plan.getBlocker2BlockedMap(), blocker.getActivityNodeId(), blocked.getActivityNodeId());
+        addToValueSet(plan.getBlocked2BlockerMap(), blocked.getActivityNodeId(), blocker.getActivityNodeId());
+    }
+
+    @Override
+    public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
+                + task.getActivityNodeId() + ":" + taskInputIndex);
+        }
+        insertIntoIndexedMap(plan.getTaskInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
+        insertIntoIndexedMap(plan.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex, task
+            .getActivityNodeId());
+    }
+
+    @Override
+    public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+        if (LOGGER.isLoggable(Level.FINEST)) {
+            LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
+                + task.getActivityNodeId() + ":" + taskOutputIndex);
+        }
+        insertIntoIndexedMap(plan.getTaskOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
+        insertIntoIndexedMap(plan.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex, task
+            .getActivityNodeId());
+    }
+
+    @Override
+    public void addTask(IActivityNode task) {
+        plan.getActivityNodeMap().put(task.getActivityNodeId(), task);
+        addToValueSet(plan.getOperatorTaskMap(), task.getOwner().getOperatorId(), task.getActivityNodeId());
+    }
+
+    private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
+        Set<V> targets = map.get(n1);
+        if (targets == null) {
+            targets = new HashSet<V>();
+            map.put(n1, targets);
+        }
+        targets.add(n2);
+    }
+
+    private <T> void extend(List<T> list, int index) {
+        int n = list.size();
+        for (int i = n; i <= index; ++i) {
+            list.add(null);
+        }
+    }
+
+    public void init(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+        plan = new JobPlan(jobSpec, jobFlags);
+    }
+
+    private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
+        List<V> vList = map.get(key);
+        if (vList == null) {
+            vList = new ArrayList<V>();
+            map.put(key, vList);
+        }
+        extend(vList, index);
+        vList.set(index, value);
+    }
+
+    public JobPlan getPlan() {
+        return plan;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
similarity index 66%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
index 61cf70f..f992de1 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobPlanBuilder.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JobPlanner.java
@@ -12,9 +12,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
 
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -25,7 +24,6 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
 import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -38,60 +36,8 @@
 import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
 import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
 
-public class JobPlanBuilder implements IActivityGraphBuilder {
-    private static final Logger LOGGER = Logger.getLogger(JobPlanBuilder.class.getName());
-
-    private JobPlan plan;
-
-    @Override
-    public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
-        addToValueSet(plan.getBlocker2BlockedMap(), blocker.getActivityNodeId(), blocked.getActivityNodeId());
-        addToValueSet(plan.getBlocked2BlockerMap(), blocked.getActivityNodeId(), blocker.getActivityNodeId());
-    }
-
-    @Override
-    public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
-        if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
-                    + task.getActivityNodeId() + ":" + taskInputIndex);
-        }
-        insertIntoIndexedMap(plan.getTaskInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
-        insertIntoIndexedMap(plan.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex, task
-                .getActivityNodeId());
-    }
-
-    @Override
-    public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
-        if (LOGGER.isLoggable(Level.FINEST)) {
-            LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
-                    + task.getActivityNodeId() + ":" + taskOutputIndex);
-        }
-        insertIntoIndexedMap(plan.getTaskOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
-        insertIntoIndexedMap(plan.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex, task
-                .getActivityNodeId());
-    }
-
-    @Override
-    public void addTask(IActivityNode task) {
-        plan.getActivityNodeMap().put(task.getActivityNodeId(), task);
-        addToValueSet(plan.getOperatorTaskMap(), task.getOwner().getOperatorId(), task.getActivityNodeId());
-    }
-
-    private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
-        Set<V> targets = map.get(n1);
-        if (targets == null) {
-            targets = new HashSet<V>();
-            map.put(n1, targets);
-        }
-        targets.add(n2);
-    }
-
-    private <T> void extend(List<T> list, int index) {
-        int n = list.size();
-        for (int i = n; i <= index; ++i) {
-            list.add(null);
-        }
-    }
+public class JobPlanner {
+    private static final Logger LOGGER = Logger.getLogger(JobPlanner.class.getName());
 
     private Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobPlan plan, JobSpecification spec, Set<JobStage> eqSets) {
         Map<ActivityNodeId, IActivityNode> activityNodeMap = plan.getActivityNodeMap();
@@ -127,12 +73,11 @@
         return null;
     }
 
-    private JobStage inferStages() throws Exception {
+    private JobStage inferStages(JobPlan plan) throws Exception {
         JobSpecification spec = plan.getJobSpecification();
 
         /*
-         * Build initial equivalence sets map. We create a map such that for
-         * each IOperatorTask, t -> { t }
+         * Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
          */
         Map<ActivityNodeId, JobStage> stageMap = new HashMap<ActivityNodeId, JobStage>();
         Set<JobStage> stages = new HashSet<JobStage>();
@@ -185,22 +130,8 @@
         return endStage;
     }
 
-    public void init(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
-        plan = new JobPlan(jobSpec, jobFlags);
-    }
-
-    private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
-        List<V> vList = map.get(key);
-        if (vList == null) {
-            vList = new ArrayList<V>();
-            map.put(key, vList);
-        }
-        extend(vList, index);
-        vList.set(index, value);
-    }
-
     private void merge(Map<ActivityNodeId, JobStage> eqSetMap, Set<JobStage> eqSets, ActivityNodeId t1,
-            ActivityNodeId t2) {
+        ActivityNodeId t2) {
         JobStage stage1 = eqSetMap.get(t1);
         Set<ActivityNodeId> s1 = stage1.getTasks();
         JobStage stage2 = eqSetMap.get(t2);
@@ -220,14 +151,17 @@
         }
     }
 
-    public JobPlan plan() throws Exception {
-        PlanUtils.visit(plan.getJobSpecification(), new IOperatorDescriptorVisitor() {
+    public JobPlan plan(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+        final JobPlanBuilder builder = new JobPlanBuilder();
+        builder.init(jobSpec, jobFlags);
+        PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
             @Override
             public void visit(IOperatorDescriptor op) throws Exception {
-                op.contributeTaskGraph(JobPlanBuilder.this);
+                op.contributeTaskGraph(builder);
             }
         });
-        JobStage endStage = inferStages();
+        JobPlan plan = builder.getPlan();
+        JobStage endStage = inferStages(plan);
         plan.setEndStage(endStage);
 
         return plan;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
new file mode 100644
index 0000000..e209515
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/NodeControllerState.java
@@ -0,0 +1,29 @@
+package edu.uci.ics.hyracks.controller.clustercontroller;
+
+import edu.uci.ics.hyracks.api.controller.INodeController;
+
+public class NodeControllerState {
+    private final INodeController nodeController;
+
+    private int lastHeartbeatDuration;
+
+    public NodeControllerState(INodeController nodeController) {
+        this.nodeController = nodeController;
+    }
+
+    void notifyHeartbeat() {
+        lastHeartbeatDuration = 0;
+    }
+
+    int incrementLastHeartbeatDuration() {
+        return lastHeartbeatDuration++;
+    }
+
+    int getLastHeartbeatDuration() {
+        return lastHeartbeatDuration;
+    }
+
+    public INodeController getNodeController() {
+        return nodeController;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
similarity index 96%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
index 3357012..69f7380 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/StageProgress.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/StageProgress.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.clustercontroller;
 
 import java.util.HashSet;
 import java.util.Set;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
similarity index 86%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
index 5f38e56..7f0a44e 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Joblet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Joblet.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.nodecontroller;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -23,7 +23,6 @@
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
 
 public class Joblet {
     private static final long serialVersionUID = 1L;
@@ -88,8 +87,17 @@
         return nodeController.getExecutor();
     }
 
-    public synchronized void notifyStageletComplete(UUID stageId, StageletStatistics stats) throws Exception {
+    public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletStatistics stats) throws Exception {
         stageletMap.remove(stageId);
-        nodeController.notifyStageComplete(jobId, stageId, stats);
+        nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
+    }
+
+    public void notifyStageletFailed(UUID stageId, int attempt) throws Exception {
+        stageletMap.remove(stageId);
+        nodeController.notifyStageFailed(jobId, stageId, attempt);        
+    }
+
+    public NodeControllerService getNodeController() {
+        return nodeController;
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
new file mode 100644
index 0000000..195fe00
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -0,0 +1,427 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.controller.nodecontroller;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import edu.uci.ics.hyracks.api.comm.Endpoint;
+import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.controller.IClusterController;
+import edu.uci.ics.hyracks.api.controller.INodeController;
+import edu.uci.ics.hyracks.api.controller.NodeCapability;
+import edu.uci.ics.hyracks.api.controller.NodeParameters;
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.Direction;
+import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IEndpointDataWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorInstanceId;
+import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
+import edu.uci.ics.hyracks.comm.ConnectionManager;
+import edu.uci.ics.hyracks.comm.DemuxDataReceiveListenerFactory;
+import edu.uci.ics.hyracks.config.NCConfig;
+import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.controller.AbstractRemoteService;
+import edu.uci.ics.hyracks.runtime.OperatorRunnable;
+
+public class NodeControllerService extends AbstractRemoteService implements INodeController {
+    private static final long serialVersionUID = 1L;
+
+    private NCConfig ncConfig;
+
+    private final String id;
+
+    private final HyracksContext ctx;
+
+    private final NodeCapability nodeCapability;
+
+    private final ConnectionManager connectionManager;
+
+    private final Timer timer;
+
+    private IClusterController ccs;
+
+    private Map<UUID, Joblet> jobletMap;
+
+    private Executor executor;
+
+    private NodeParameters nodeParameters;
+
+    public NodeControllerService(NCConfig ncConfig) throws Exception {
+        this.ncConfig = ncConfig;
+        id = ncConfig.nodeId;
+        this.ctx = new HyracksContext(ncConfig.frameSize);
+        if (id == null) {
+            throw new Exception("id not set");
+        }
+        nodeCapability = computeNodeCapability();
+        connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
+        jobletMap = new HashMap<UUID, Joblet>();
+        executor = Executors.newCachedThreadPool();
+        timer = new Timer(true);
+    }
+
+    private static Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
+
+    @Override
+    public void start() throws Exception {
+        LOGGER.log(Level.INFO, "Starting NodeControllerService");
+        connectionManager.start();
+        Registry registry = LocateRegistry.getRegistry(ncConfig.ccHost, ncConfig.ccPort);
+        IClusterController cc = (IClusterController) registry.lookup(IClusterController.class.getName());
+        this.nodeParameters = cc.registerNode(this);
+
+        // Schedule heartbeat generator.
+        timer.schedule(new HeartbeatTask(cc), 0, nodeParameters.getHeartbeatPeriod());
+
+        LOGGER.log(Level.INFO, "Started NodeControllerService");
+    }
+
+    @Override
+    public void stop() throws Exception {
+        LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+        connectionManager.stop();
+        LOGGER.log(Level.INFO, "Stopped NodeControllerService");
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public NodeCapability getNodeCapability() throws Exception {
+        return nodeCapability;
+    }
+
+    public ConnectionManager getConnectionManager() {
+        return connectionManager;
+    }
+
+    private static NodeCapability computeNodeCapability() {
+        NodeCapability nc = new NodeCapability();
+        nc.setCPUCount(Runtime.getRuntime().availableProcessors());
+        return nc;
+    }
+
+    private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+        String ipaddrStr = ncConfig.dataIPAddress;
+        ipaddrStr = ipaddrStr.trim();
+        Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
+        Matcher m = pattern.matcher(ipaddrStr);
+        if (!m.matches()) {
+            throw new Exception(MessageFormat.format(
+                "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+        }
+        byte[] ipBytes = new byte[4];
+        ipBytes[0] = (byte) Integer.parseInt(m.group(1));
+        ipBytes[1] = (byte) Integer.parseInt(m.group(2));
+        ipBytes[2] = (byte) Integer.parseInt(m.group(3));
+        ipBytes[3] = (byte) Integer.parseInt(m.group(4));
+        return InetAddress.getByAddress(ipBytes);
+    }
+
+    @Override
+    public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
+        Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception {
+        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
+
+        final Joblet joblet = getLocalJoblet(jobId);
+
+        Stagelet stagelet = new Stagelet(joblet, stageId, attempt, id);
+        joblet.setStagelet(stageId, stagelet);
+
+        final Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
+        Map<OperatorInstanceId, OperatorRunnable> honMap = stagelet.getOperatorMap();
+
+        List<Endpoint> endpointList = new ArrayList<Endpoint>();
+
+        for (ActivityNodeId hanId : tasks.keySet()) {
+            IActivityNode han = plan.getActivityNodeMap().get(hanId);
+            if (LOGGER.isLoggable(Level.FINEST)) {
+                LOGGER.finest("Initializing " + hanId + " -> " + han);
+            }
+            IOperatorDescriptor op = han.getOwner();
+            List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
+            for (int i : tasks.get(hanId)) {
+                IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
+                OperatorRunnable or = new OperatorRunnable(ctx, hon);
+                stagelet.setOperator(op.getOperatorId(), i, or);
+                if (inputs != null) {
+                    for (int j = 0; j < inputs.size(); ++j) {
+                        if (j >= 1) {
+                            throw new IllegalStateException();
+                        }
+                        IConnectorDescriptor conn = inputs.get(j);
+                        OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+                            .getOperatorId();
+                        OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+                            .getOperatorId();
+                        Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
+                        endpointList.add(endpoint);
+                        DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId, stageId);
+                        connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
+                        PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
+                            .getTaskInputMap().get(hanId).get(j), i);
+                        if (LOGGER.isLoggable(Level.FINEST)) {
+                            LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
+                        }
+                        portMap.put(piId, endpoint);
+                        IFrameReader reader = createReader(conn, drlf, i, plan, stagelet, opPartitions
+                            .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+                        or.setFrameReader(reader);
+                    }
+                }
+                honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
+            }
+        }
+
+        stagelet.setEndpointList(endpointList);
+
+        return portMap;
+    }
+
+    private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
+        final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
+        throws HyracksDataException {
+        final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex, nProducerCount,
+            nConsumerCount);
+
+        return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
+            private int frameCount;
+
+            @Override
+            public void open() throws HyracksDataException {
+                frameCount = 0;
+                reader.open();
+            }
+
+            @Override
+            public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                boolean status = reader.nextFrame(buffer);
+                if (status) {
+                    ++frameCount;
+                }
+                return status;
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                reader.close();
+                stagelet.getStatistics().getStatisticsMap().put(
+                    "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
+                    String.valueOf(frameCount));
+            }
+        } : reader;
+    }
+
+    @Override
+    public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
+        Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+        final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
+        LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
+        final Joblet ji = getLocalJoblet(jobId);
+        Stagelet si = (Stagelet) ji.getStagelet(stageId);
+        final Map<OperatorInstanceId, OperatorRunnable> honMap = si.getOperatorMap();
+
+        final Stagelet stagelet = (Stagelet) ji.getStagelet(stageId);
+
+        final JobSpecification spec = plan.getJobSpecification();
+
+        for (ActivityNodeId hanId : tasks.keySet()) {
+            IActivityNode han = plan.getActivityNodeMap().get(hanId);
+            IOperatorDescriptor op = han.getOwner();
+            List<IConnectorDescriptor> outputs = plan.getTaskOutputs(hanId);
+            for (int i : tasks.get(hanId)) {
+                OperatorRunnable or = honMap.get(new OperatorInstanceId(op.getOperatorId(), i));
+                if (outputs != null) {
+                    for (int j = 0; j < outputs.size(); ++j) {
+                        final IConnectorDescriptor conn = outputs.get(j);
+                        OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
+                            .getOperatorId();
+                        OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
+                            .getOperatorId();
+                        final int senderIndex = i;
+                        IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
+                            @Override
+                            public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
+                                PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
+                                    Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+                                Endpoint ep = globalPortMap.get(piId);
+                                if (ep == null) {
+                                    LOGGER.info("Got null Endpoint for " + piId);
+                                    throw new NullPointerException();
+                                }
+                                if (LOGGER.isLoggable(Level.FINEST)) {
+                                    LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
+                                }
+                                return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
+                                    .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+                            }
+                        };
+                        or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i, opPartitions.get(
+                            producerOpId).size(), opPartitions.get(consumerOpId).size()));
+                    }
+                }
+                stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
+            }
+        }
+    }
+
+    private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
+        final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
+        return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
+            private int frameCount;
+
+            @Override
+            public void open() throws HyracksDataException {
+                frameCount = 0;
+                writer.open();
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                ++frameCount;
+                writer.nextFrame(buffer);
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                writer.close();
+                stagelet.getStatistics().getStatisticsMap().put(
+                    "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
+                    String.valueOf(frameCount));
+            }
+        } : writer;
+    }
+
+    @Override
+    public void commitJobletInitialization(UUID jobId, UUID stageId) throws Exception {
+        final Joblet ji = getLocalJoblet(jobId);
+        Stagelet si = (Stagelet) ji.getStagelet(stageId);
+        for (Endpoint e : si.getEndpointList()) {
+            connectionManager.unacceptConnection(e.getEndpointId());
+        }
+        si.setEndpointList(null);
+    }
+
+    private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+        Joblet ji = jobletMap.get(jobId);
+        if (ji == null) {
+            ji = new Joblet(this, jobId);
+            jobletMap.put(jobId, ji);
+        }
+        return ji;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    @Override
+    public synchronized void cleanUpJob(UUID jobId) throws Exception {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Cleaning up after job: " + jobId);
+        }
+        jobletMap.remove(jobId);
+        connectionManager.dumpStats();
+    }
+
+    @Override
+    public void startStage(UUID jobId, UUID stageId) throws Exception {
+        Joblet ji = jobletMap.get(jobId);
+        if (ji != null) {
+            Stagelet s = ji.getStagelet(stageId);
+            if (s != null) {
+                s.start();
+            }
+        }
+    }
+
+    public void notifyStageComplete(UUID jobId, UUID stageId, int attempt, StageletStatistics stats) throws Exception {
+        ccs.notifyStageletComplete(jobId, stageId, attempt, id, stats);
+    }
+
+    public void notifyStageFailed(UUID jobId, UUID stageId, int attempt) throws Exception {
+        ccs.notifyStageletFailure(jobId, stageId, attempt, id);
+    }
+
+    @Override
+    public void notifyRegistration(IClusterController ccs) throws Exception {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public NCConfig getConfiguration() throws Exception {
+        return ncConfig;
+    }
+
+    private class HeartbeatTask extends TimerTask {
+        private IClusterController cc;
+
+        public HeartbeatTask(IClusterController cc) {
+            this.cc = cc;
+        }
+
+        @Override
+        public void run() {
+            try {
+                cc.nodeHeartbeat(id);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public synchronized void abortJoblet(UUID jobId, UUID stageId) throws Exception {
+        Joblet ji = jobletMap.get(jobId);
+        if (ji != null) {
+            Stagelet stagelet = ji.getStagelet(stageId);
+            if (stagelet != null) {
+                stagelet.abort();
+                connectionManager.abortConnections(jobId, stageId);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
similarity index 78%
rename from hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java
rename to hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index e82fa5b..e16cf30 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.job;
+package edu.uci.ics.hyracks.controller.nodecontroller;
 
 import java.rmi.RemoteException;
 import java.util.Date;
@@ -40,19 +40,24 @@
 
     private final UUID stageId;
 
+    private final int attempt;
+
     private final Map<OperatorInstanceId, OperatorRunnable> honMap;
 
     private List<Endpoint> endpointList;
 
     private boolean started;
 
+    private volatile boolean abort;
+
     private final Set<OperatorInstanceId> pendingOperators;
 
     private final StageletStatistics stats;
 
-    public Stagelet(Joblet joblet, UUID stageId, String nodeId) throws RemoteException {
+    public Stagelet(Joblet joblet, UUID stageId, int attempt, String nodeId) throws RemoteException {
         this.joblet = joblet;
         this.stageId = stageId;
+        this.attempt = attempt;
         pendingOperators = new HashSet<OperatorInstanceId>();
         started = false;
         honMap = new HashMap<OperatorInstanceId, OperatorRunnable>();
@@ -85,6 +90,13 @@
         notifyAll();
     }
 
+    public synchronized void abort() {
+        this.abort = true;
+        for (OperatorRunnable r : honMap.values()) {
+            r.abort();
+        }
+    }
+
     public void installRunnable(final OperatorInstanceId opIId) {
         pendingOperators.add(opIId);
         final OperatorRunnable hon = honMap.get(opIId);
@@ -97,16 +109,22 @@
                     e.printStackTrace();
                     return;
                 }
+                if (abort) {
+                    return;
+                }
                 try {
                     LOGGER.log(Level.INFO, "Starting runnable for operator: " + joblet.getJobId() + ":" + stageId + ":"
-                            + opIId.getOperatorId() + ":" + opIId.getPartition());
+                        + opIId.getOperatorId() + ":" + opIId.getPartition());
                 } catch (Exception e) {
                     e.printStackTrace();
+                    // notifyOperatorFailure(opIId);
                 }
                 try {
                     hon.run();
-                } finally {
                     notifyOperatorCompletion(opIId);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    // notifyOperatorFailure(opIId);
                 }
             }
         });
@@ -117,15 +135,24 @@
         if (pendingOperators.isEmpty()) {
             stats.setEndTime(new Date());
             try {
-                joblet.notifyStageletComplete(stageId, stats);
+                joblet.notifyStageletComplete(stageId, attempt, stats);
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
     }
 
+    protected synchronized void notifyOperatorFailure(OperatorInstanceId opIId) {
+        abort();
+        try {
+            joblet.notifyStageletFailed(stageId, attempt);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
     private synchronized void waitUntilStarted() throws InterruptedException {
-        while (!started) {
+        while (!started && !abort) {
             wait();
         }
     }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
index 937e0a4..5013812 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningConnectorDescriptor.java
@@ -37,17 +37,16 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         JobSpecification spec = plan.getJobSpecification();
-        final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
-        final HashDataWriter hashWriter = new HashDataWriter(ctx, consumerPartitionCount, edwFactory, spec
-                .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+        final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
+            .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
         return hashWriter;
     }
 
     @Override
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
index 9f36730..28cd1fa 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNHashPartitioningMergingConnectorDescriptor.java
@@ -36,7 +36,7 @@
     private final IBinaryComparatorFactory[] comparatorFactories;
 
     public MToNHashPartitioningMergingConnectorDescriptor(JobSpecification spec, ITuplePartitionComputerFactory tpcf,
-            int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
+        int[] sortFields, IBinaryComparatorFactory[] comparatorFactories) {
         super(spec);
         this.tpcf = tpcf;
         this.sortFields = sortFields;
@@ -45,19 +45,18 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         JobSpecification spec = plan.getJobSpecification();
-        final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
-        final HashDataWriter hashWriter = new HashDataWriter(ctx, consumerPartitionCount, edwFactory, spec
-                .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
+        final HashDataWriter hashWriter = new HashDataWriter(ctx, nConsumerPartitions, edwFactory, spec
+            .getConnectorRecordDescriptor(this), tpcf.createPartitioner());
         return hashWriter;
     }
 
     @Override
-    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux, int index)
-            throws HyracksDataException {
+    public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-        for(int i = 0; i < comparatorFactories.length; ++i) {
+        for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
         JobSpecification spec = plan.getJobSpecification();
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
index f455f01..d3a8af9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNRangePartitioningConnectorDescriptor.java
@@ -41,7 +41,7 @@
         private final FrameTupleAccessor tupleAccessor;
 
         public RangeDataWriter(HyracksContext ctx, int consumerPartitionCount, IFrameWriter[] epWriters,
-                FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
+            FrameTupleAppender[] appenders, RecordDescriptor recordDescriptor) {
             this.consumerPartitionCount = consumerPartitionCount;
             this.epWriters = epWriters;
             this.appenders = appenders;
@@ -106,12 +106,11 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         JobSpecification spec = plan.getJobSpecification();
-        final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
-        final IFrameWriter[] epWriters = new IFrameWriter[consumerPartitionCount];
-        final FrameTupleAppender[] appenders = new FrameTupleAppender[consumerPartitionCount];
-        for (int i = 0; i < consumerPartitionCount; ++i) {
+        final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+        final FrameTupleAppender[] appenders = new FrameTupleAppender[nConsumerPartitions];
+        for (int i = 0; i < nConsumerPartitions; ++i) {
             try {
                 epWriters[i] = edwFactory.createFrameWriter(i);
                 appenders[i] = new FrameTupleAppender(ctx);
@@ -120,14 +119,14 @@
                 throw new HyracksDataException(e);
             }
         }
-        final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, consumerPartitionCount, epWriters, appenders, spec
-                .getConnectorRecordDescriptor(this));
+        final RangeDataWriter rangeWriter = new RangeDataWriter(ctx, nConsumerPartitions, epWriters, appenders, spec
+            .getConnectorRecordDescriptor(this));
         return rangeWriter;
     }
 
     @Override
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
index 50d804a..324c39f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MToNReplicatingConnectorDescriptor.java
@@ -36,11 +36,9 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
-        JobSpecification spec = plan.getJobSpecification();
-        final int consumerPartitionCount = spec.getConsumer(this).getPartitions().length;
-        final IFrameWriter[] epWriters = new IFrameWriter[consumerPartitionCount];
-        for (int i = 0; i < consumerPartitionCount; ++i) {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+        final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+        for (int i = 0; i < nConsumerPartitions; ++i) {
             epWriters[i] = edwFactory.createFrameWriter(i);
         }
         return new IFrameWriter() {
@@ -73,7 +71,7 @@
 
     @Override
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
index c46026a..fff3aa6 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/OneToOneConnectorDescriptor.java
@@ -34,13 +34,13 @@
 
     @Override
     public IFrameWriter createSendSideWriter(HyracksContext ctx, JobPlan plan, IEndpointDataWriterFactory edwFactory,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return edwFactory.createFrameWriter(index);
     }
 
     @Override
     public IFrameReader createReceiveSideReader(HyracksContext ctx, JobPlan plan, IConnectionDemultiplexer demux,
-            int index) throws HyracksDataException {
+        int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         return new NonDeterministicFrameReader(ctx, demux);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
index b411020..2e610a9 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/base/AbstractOperatorDescriptor.java
@@ -74,16 +74,6 @@
     }
 
     @Override
-    public String[] getPartitions() {
-        return partitions;
-    }
-
-    @Override
-    public void setPartitions(String[] partitions) {
-        this.partitions = partitions;
-    }
-
-    @Override
     public RecordDescriptor[] getOutputRecordDescriptors() {
         return recordDescriptors;
     }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
index 86c247d..b2dc234 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
@@ -34,8 +34,7 @@
 
 class GroupingHashTable {
     /**
-     * The pointers in the link store 3 int values for each entry in the
-     * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+     * The pointers in the link store 3 int values for each entry in the hashtable: (bufferIdx, tIndex, accumulatorIdx).
      * 
      * @author vinayakb
      */
@@ -81,8 +80,8 @@
     private final FrameTupleAccessor storedKeysAccessor;
 
     GroupingHashTable(HyracksContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
-            ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
-            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
+        ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
+        RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
         this.ctx = ctx;
         appender = new FrameTupleAppender(ctx);
         buffers = new ArrayList<ByteBuffer>();
@@ -161,7 +160,7 @@
             }
             int saIndex = accumulatorSize++;
             aggregator = accumulators[saIndex] = aggregatorFactory.createAggregator(inRecordDescriptor,
-                    outRecordDescriptor);
+                outRecordDescriptor);
             aggregator.init(accessor, tIndex);
             link.add(sbIndex, stIndex, saIndex);
         }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
index c1b917d..7ec0eab 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.controller.IClusterController;
@@ -61,7 +62,7 @@
         private Object value;
 
         public HDFSCustomReader(Map<String, String> jobConfMap, HadoopFileSplit inputSplit,
-                String inputFormatClassName, Reporter reporter) {
+            String inputFormatClassName, Reporter reporter) {
             try {
                 JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap) jobConfMap);
                 FileSystem fileSystem = null;
@@ -74,7 +75,7 @@
                 Class inputFormatClass = Class.forName(inputFormatClassName);
                 InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
                 hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(getFileSplit(inputSplit), conf,
-                        reporter);
+                    reporter);
                 if (hadoopRecordReader instanceof SequenceFileRecordReader) {
                     inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
                     inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
@@ -130,27 +131,27 @@
 
         private FileSplit getFileSplit(HadoopFileSplit hadoopFileSplit) {
             FileSplit fileSplit = new FileSplit(new Path(hadoopFileSplit.getFile()), hadoopFileSplit.getStart(),
-                    hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
+                hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
             return fileSplit;
         }
     }
 
     public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, JobSpecification spec,
-            HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+        HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
         super(spec, splits, recordDescriptor);
         this.inputFormatClassName = inputFormatClassName;
         this.jobConfMap = jobConfMap;
     }
 
     public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, InetSocketAddress nameNode,
-            JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+        JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
         super(spec, null, recordDescriptor);
         this.inputFormatClassName = inputFormatClassName;
         this.jobConfMap = jobConfMap;
     }
 
     public HadoopReadOperatorDescriptor(IClusterController clusterController, Map<String, String> jobConfMap,
-            JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+        JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
         super(spec, null, recordDescriptor);
         HadoopAdapter hadoopAdapter = HadoopAdapter.getInstance(fileSystemURL);
         String inputPathString = jobConfMap.get("mapred.input.dir");
@@ -170,7 +171,7 @@
     }
 
     private void configurePartitionConstraints(IClusterController clusterController,
-            Map<String, List<HadoopFileSplit>> blocksToRead) {
+        Map<String, List<HadoopFileSplit>> blocksToRead) {
         List<LocationConstraint> locationConstraints = new ArrayList<LocationConstraint>();
         Map<String, INodeController> registry = null;
         try {
@@ -223,8 +224,8 @@
             }
         }
 
-        PartitionConstraint partitionConstraint = new PartitionConstraint(locationConstraints
-                .toArray(new LocationConstraint[] {}));
+        PartitionConstraint partitionConstraint = new ExplicitPartitionConstraint(locationConstraints
+            .toArray(new LocationConstraint[] {}));
         this.setPartitionConstraint(partitionConstraint);
     }
 
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
index 425f967..bb1ba84 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/CCDriver.java
@@ -17,7 +17,7 @@
 import org.kohsuke.args4j.CmdLineParser;
 
 import edu.uci.ics.hyracks.config.CCConfig;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
+import edu.uci.ics.hyracks.controller.clustercontroller.ClusterControllerService;
 
 public class CCDriver {
     public static void main(String args[]) throws Exception {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
index 657b304..3c856ab 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/driver/NCDriver.java
@@ -19,7 +19,7 @@
 import edu.uci.ics.dcache.client.DCacheClient;
 import edu.uci.ics.dcache.client.DCacheClientConfig;
 import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
+import edu.uci.ics.hyracks.controller.nodecontroller.NodeControllerService;
 
 public class NCDriver {
     public static void main(String args[]) throws Exception {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java
deleted file mode 100644
index 3259649..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobControl.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.job;
-
-import java.rmi.RemoteException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-
-public class JobControl {
-    private static final long serialVersionUID = 1L;
-
-    private final JobManager jobManager;
-
-    private final JobPlan jobPlan;
-
-    private final UUID jobId;
-
-    private final Map<UUID, StageProgress> stageProgressMap;
-
-    private final Set<UUID> completeStages;
-
-    private JobStatus jobStatus;
-
-    private JobStatistics jobStatistics;
-
-    public JobControl(JobManager jobManager, JobPlan jobPlan) throws RemoteException {
-        this.jobManager = jobManager;
-        this.jobPlan = jobPlan;
-        jobId = UUID.randomUUID();
-        stageProgressMap = new HashMap<UUID, StageProgress>();
-        completeStages = new HashSet<UUID>();
-        jobStatus = JobStatus.INITIALIZED;
-        jobStatistics = new JobStatistics();
-    }
-
-    public JobPlan getJobPlan() {
-        return jobPlan;
-    }
-
-    public UUID getJobId() {
-        return jobId;
-    }
-
-    public synchronized JobStatus getJobStatus() {
-        return jobStatus;
-    }
-
-    public Set<UUID> getCompletedStages() {
-        return completeStages;
-    }
-
-    public void setStatus(JobStatus status) {
-        jobStatus = status;
-    }
-
-    public StageProgress getStageProgress(int stageId) {
-        return stageProgressMap.get(stageId);
-    }
-
-    public void setStageProgress(UUID stageId, StageProgress stageProgress) {
-        stageProgressMap.put(stageId, stageProgress);
-    }
-
-    public synchronized void notifyStageletComplete(UUID stageId, String nodeId, StageletStatistics ss)
-            throws Exception {
-        StageProgress stageProgress = stageProgressMap.get(stageId);
-        stageProgress.markNodeComplete(nodeId);
-        StageStatistics stageStatistics = stageProgress.getStageStatistics();
-        stageStatistics.addStageletStatistics(ss);
-        if (stageProgress.stageComplete()) {
-            jobStatistics.addStageStatistics(stageStatistics);
-            stageProgressMap.remove(stageId);
-            completeStages.add(stageId);
-            jobManager.advanceJob(this);
-        }
-    }
-
-    public synchronized JobStatistics waitForCompletion() throws Exception {
-        while (jobStatus != JobStatus.TERMINATED) {
-            wait();
-        }
-        return jobStatistics;
-    }
-
-    public synchronized void notifyJobComplete() {
-        jobStatus = JobStatus.TERMINATED;
-        notifyAll();
-    }
-
-    public JobStatistics getJobStatistics() {
-        return jobStatistics;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java
deleted file mode 100644
index f976bb9..0000000
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/job/JobManager.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.job;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.Vector;
-import java.util.concurrent.Semaphore;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
-import edu.uci.ics.hyracks.api.controller.INodeController;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
-import edu.uci.ics.hyracks.api.job.JobFlag;
-import edu.uci.ics.hyracks.api.job.JobPlan;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.api.job.JobStage;
-import edu.uci.ics.hyracks.api.job.JobStatus;
-import edu.uci.ics.hyracks.api.job.statistics.JobStatistics;
-import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
-import edu.uci.ics.hyracks.dataflow.base.IOperatorDescriptorVisitor;
-import edu.uci.ics.hyracks.dataflow.util.PlanUtils;
-
-public class JobManager {
-    private static final Logger LOGGER = Logger.getLogger(JobManager.class.getName());
-    private ClusterControllerService ccs;
-
-    private final Map<UUID, JobControl> jobMap;
-
-    public JobManager(ClusterControllerService ccs) {
-        this.ccs = ccs;
-        jobMap = new HashMap<UUID, JobControl>();
-    }
-
-    public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
-        JobPlanBuilder builder = new JobPlanBuilder();
-        builder.init(jobSpec, jobFlags);
-        JobControl jc = new JobControl(this, builder.plan());
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(jc.getJobPlan().toString());
-        }
-        jobMap.put(jc.getJobId(), jc);
-
-        return jc.getJobId();
-    }
-
-    public synchronized void start(UUID jobId) throws Exception {
-        JobControl jobControlImpl = jobMap.get(jobId);
-        LOGGER
-                .info("Starting job: " + jobControlImpl.getJobId() + ", Current status: "
-                        + jobControlImpl.getJobStatus());
-        if (jobControlImpl.getJobStatus() != JobStatus.INITIALIZED) {
-            return;
-        }
-        jobControlImpl.getJobStatistics().setStartTime(new Date());
-        jobControlImpl.setStatus(JobStatus.RUNNING);
-        schedule(jobControlImpl);
-    }
-
-    public synchronized void advanceJob(JobControl jobControlImpl) throws Exception {
-        schedule(jobControlImpl);
-    }
-
-    private void schedule(JobControl jobControlImpl) throws Exception {
-        JobPlan plan = jobControlImpl.getJobPlan();
-        JobStage endStage = plan.getEndStage();
-
-        Set<UUID> completedStages = jobControlImpl.getCompletedStages();
-        List<JobStage> runnableStages = new ArrayList<JobStage>();
-        findRunnableStages(endStage, runnableStages, completedStages, new HashSet<UUID>());
-        if (runnableStages.size() == 1 && runnableStages.get(0).getTasks().isEmpty()) {
-            LOGGER.info("Job " + jobControlImpl.getJobId() + " complete");
-            jobControlImpl.getJobStatistics().setEndTime(new Date());
-            cleanUp(jobControlImpl);
-            jobControlImpl.notifyJobComplete();
-        } else {
-            for (JobStage s : runnableStages) {
-                if (s.isStarted()) {
-                    continue;
-                }
-                startStage(jobControlImpl, s);
-            }
-        }
-    }
-
-    private void cleanUp(JobControl jc) {
-        jobMap.remove(jc.getJobId());
-        ccs.notifyJobComplete(jc.getJobId());
-    }
-
-    private void startStage(JobControl jc, JobStage stage) throws Exception {
-        stage.setStarted();
-        Set<String> candidateNodes = deploy(jc, stage);
-        for (String nodeId : candidateNodes) {
-            ccs.lookupNode(nodeId).startStage(jc.getJobId(), stage.getId());
-        }
-    }
-
-    private void findRunnableStages(JobStage s, List<JobStage> runnableStages, Set<UUID> completedStages, Set<UUID> seen) {
-        boolean runnable = true;
-        if (seen.contains(s.getId())) {
-            return;
-        }
-        seen.add(s.getId());
-        for (JobStage dep : s.getDependencies()) {
-            boolean depComplete = completedStages.contains(dep.getId());
-            runnable = runnable && depComplete;
-            if (!depComplete) {
-                findRunnableStages(dep, runnableStages, completedStages, seen);
-            }
-        }
-        if (runnable) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Runnable stage: " + s);
-            }
-            runnableStages.add(s);
-        }
-    }
-
-    private Set<String> deploy(JobControl jc, JobStage stage) throws Exception {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Deploying: " + stage);
-        }
-        Set<String> candidateNodes = plan(jc, stage);
-        StageProgress stageProgress = new StageProgress(stage.getId());
-        stageProgress.addPendingNodes(candidateNodes);
-        Map<PortInstanceId, Endpoint> globalPortMap = runRemote(new Phase1Installer(jc, stage),
-                new PortMapMergingAccumulator(), candidateNodes);
-        runRemote(new Phase2Installer(jc, stage, globalPortMap), null, candidateNodes);
-        runRemote(new Phase3Installer(jc, stage), null, candidateNodes);
-        jc.setStageProgress(stage.getId(), stageProgress);
-        return candidateNodes;
-    }
-
-    private interface RemoteOp<T> {
-        public T execute(INodeController node) throws Exception;
-    }
-
-    private interface Accumulator<T, R> {
-        public void accumulate(T o);
-
-        public R getResult();
-    }
-
-    private static class Phase1Installer implements RemoteOp<Map<PortInstanceId, Endpoint>> {
-        private JobControl jc;
-        private JobStage stage;
-
-        public Phase1Installer(JobControl jc, JobStage stage) {
-            this.jc = jc;
-            this.stage = stage;
-        }
-
-        @Override
-        public Map<PortInstanceId, Endpoint> execute(INodeController node) throws Exception {
-            return node.initializeJobletPhase1(jc.getJobId(), jc.getJobPlan(), stage);
-        }
-
-        @Override
-        public String toString() {
-            return jc.getJobId() + " Distribution Phase 1";
-        }
-    }
-
-    private static class Phase2Installer implements RemoteOp<Void> {
-        private JobControl jc;
-        private JobStage stage;
-        private Map<PortInstanceId, Endpoint> globalPortMap;
-
-        public Phase2Installer(JobControl jc, JobStage stage, Map<PortInstanceId, Endpoint> globalPortMap) {
-            this.jc = jc;
-            this.stage = stage;
-            this.globalPortMap = globalPortMap;
-        }
-
-        @Override
-        public Void execute(INodeController node) throws Exception {
-            node.initializeJobletPhase2(jc.getJobId(), jc.getJobPlan(), stage, globalPortMap);
-            return null;
-        }
-
-        @Override
-        public String toString() {
-            return jc.getJobId() + " Distribution Phase 2";
-        }
-    }
-
-    private static class Phase3Installer implements RemoteOp<Void> {
-        private JobControl jc;
-        private JobStage stage;
-
-        public Phase3Installer(JobControl jc, JobStage stage) {
-            this.jc = jc;
-            this.stage = stage;
-        }
-
-        @Override
-        public Void execute(INodeController node) throws Exception {
-            node.commitJobletInitialization(jc.getJobId(), jc.getJobPlan(), stage);
-            return null;
-        }
-
-        @Override
-        public String toString() {
-            return jc.getJobId() + " Distribution Phase 3";
-        }
-    }
-
-    private static class PortMapMergingAccumulator implements
-            Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
-        Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
-
-        @Override
-        public void accumulate(Map<PortInstanceId, Endpoint> o) {
-            portMap.putAll(o);
-        }
-
-        @Override
-        public Map<PortInstanceId, Endpoint> getResult() {
-            return portMap;
-        }
-    }
-
-    private <T, R> R runRemote(final RemoteOp<T> remoteOp, final Accumulator<T, R> accumulator,
-            Set<String> candidateNodes) throws Exception {
-        LOGGER.log(Level.INFO, remoteOp + " : " + candidateNodes);
-
-        final Semaphore installComplete = new Semaphore(candidateNodes.size());
-        final List<Exception> errors = new Vector<Exception>();
-        for (final String nodeId : candidateNodes) {
-            final INodeController node = ccs.lookupNode(nodeId);
-
-            installComplete.acquire();
-            Runnable remoteRunner = new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        T t = remoteOp.execute(node);
-                        if (accumulator != null) {
-                            synchronized (accumulator) {
-                                accumulator.accumulate(t);
-                            }
-                        }
-                    } catch (Exception e) {
-                        errors.add(e);
-                    } finally {
-                        installComplete.release();
-                    }
-                }
-            };
-
-            ccs.getExecutor().execute(remoteRunner);
-        }
-        installComplete.acquire(candidateNodes.size());
-        if (!errors.isEmpty()) {
-            throw errors.get(0);
-        }
-        return accumulator == null ? null : accumulator.getResult();
-    }
-
-    private Set<String> plan(JobControl jc, JobStage stage) throws Exception {
-        LOGGER.log(Level.INFO, String.valueOf(jc.getJobId()) + ": Planning");
-
-        final Set<OperatorDescriptorId> opSet = new HashSet<OperatorDescriptorId>();
-        for (ActivityNodeId t : stage.getTasks()) {
-            opSet.add(jc.getJobPlan().getActivityNodeMap().get(t).getOwner().getOperatorId());
-        }
-
-        final Set<String> candidateNodes = new HashSet<String>();
-
-        IOperatorDescriptorVisitor visitor = new IOperatorDescriptorVisitor() {
-            @Override
-            public void visit(IOperatorDescriptor op) throws Exception {
-                if (!opSet.contains(op.getOperatorId())) {
-                    return;
-                }
-                String[] partitions = op.getPartitions();
-                if (partitions == null) {
-                    PartitionConstraint pc = op.getPartitionConstraint();
-                    LocationConstraint[] lcs = pc.getLocationConstraints();
-                    String[] assignment = new String[lcs.length];
-                    for (int i = 0; i < lcs.length; ++i) {
-                        String nodeId = ((AbsoluteLocationConstraint) lcs[i]).getLocationId();
-                        assignment[i] = nodeId;
-                    }
-                    op.setPartitions(assignment);
-                    partitions = assignment;
-                }
-                for (String p : partitions) {
-                    candidateNodes.add(p);
-                }
-            }
-        };
-
-        PlanUtils.visit(jc.getJobPlan().getJobSpecification(), visitor);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(stage + " Candidate nodes: " + candidateNodes);
-        }
-        return candidateNodes;
-    }
-
-    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, String nodeId,
-            StageletStatistics statistics) throws Exception {
-        JobControl jc = jobMap.get(jobId);
-        if (jc != null) {
-            jc.notifyStageletComplete(stageId, nodeId, statistics);
-        }
-    }
-
-    public synchronized JobStatus getJobStatus(UUID jobId) {
-        JobControl jc = jobMap.get(jobId);
-        return jc.getJobStatus();
-    }
-
-    public JobStatistics waitForCompletion(UUID jobId) throws Exception {
-        JobControl jc;
-        synchronized (this) {
-            jc = jobMap.get(jobId);
-        }
-        if (jc != null) {
-            return jc.waitForCompletion();
-        }
-        return null;
-    }
-}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
index 5964d07..149f867 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/runtime/OperatorRunnable.java
@@ -25,6 +25,7 @@
     private IOperatorNodePushable opNode;
     private IFrameReader reader;
     private ByteBuffer buffer;
+    private volatile boolean abort;
 
     public OperatorRunnable(HyracksContext ctx, IOperatorNodePushable opNode) {
         this.opNode = opNode;
@@ -39,6 +40,10 @@
         this.reader = reader;
     }
 
+    public void abort() {
+        abort = true;
+    }
+
     @Override
     public void run() {
         try {
@@ -46,6 +51,9 @@
             if (reader != null) {
                 reader.open();
                 while (reader.nextFrame(buffer)) {
+                    if (abort) {
+                        break;
+                    }
                     buffer.flip();
                     opNode.nextFrame(buffer);
                     buffer.compact();
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
new file mode 100644
index 0000000..3f03526
--- /dev/null
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -0,0 +1,310 @@
+program hyrackscc;
+
+import java.util.UUID;
+import java.util.Set;
+
+import jol.types.basic.Tuple;
+import jol.types.basic.TupleSet;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+
+define(activitystage_temp, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage_INITIAL activitystage_temp(JobId, OperatorId, ActivityId, 0) :-
+    activitynode(JobId, OperatorId, ActivityId, _);
+
+activitystage_BLOCKED activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityblocked(JobId, OperatorId1, ActivityId1, OperatorId2, ActivityId2),
+    StageNumber2 <= StageNumber1
+    {
+        StageNumber := StageNumber1 + 1;
+    };
+
+activitystage_PIPELINED_1 activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+    connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+    StageNumber1 != StageNumber2
+    {
+        StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+    };
+
+activitystage_PIPELINED_2 activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber) :-
+    activitystage_temp(JobId, OperatorId1, ActivityId1, StageNumber1),
+    activitystage_temp(JobId, OperatorId2, ActivityId2, StageNumber2),
+    activityconnection(JobId, OperatorId1, Operator1Port, edu.uci.ics.hyracks.api.dataflow.Direction.OUTPUT, ActivityId1, _),
+    activityconnection(JobId, OperatorId2, Operator2Port, edu.uci.ics.hyracks.api.dataflow.Direction.INPUT, ActivityId2, _),
+    connectordescriptor(JobId, _, OperatorId1, Operator1Port, OperatorId2, Operator2Port, _),
+    StageNumber1 != StageNumber2
+    {
+        StageNumber := java.lang.Math.max(StageNumber1, StageNumber2);
+    };
+
+watch(activitystage_temp, a);
+
+watch(activityconnection, a);
+watch(activityblocked, a);
+watch(operatordescriptor, a);
+watch(connectordescriptor, a);
+
+watch(activitystage, a);
+watch(activitystage, i);
+watch(activitystage, d);
+
+define(activitystage, keys(0, 1, 2), {UUID, OperatorDescriptorId, ActivityNodeId, Integer});
+
+activitystage(JobId, OperatorId, ActivityId, max<StageNumber>) :-
+    activitystage_temp(JobId, OperatorId, ActivityId, StageNumber);
+
+define(jobstage, keys(0, 1), {UUID, Integer, UUID});
+
+jobstage(JobId, StageNumber, StageId) :-
+    activitystage(JobId, _, _, StageNumber)
+    {
+        StageId := java.util.UUID.randomUUID();
+    };
+
+watch(jobstage, a);
+
+define(jobattempt, keys(), {UUID, Integer});
+
+jobattempt(JobId, 0) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, _, _, _),
+    jobstart(JobId, _);
+
+jobattempt(JobId, NextAttempt) :-
+    jobattempt(JobId, Attempt),
+    stagestart(JobId, _, Attempt),
+    abortcomplete(JobId, _, Attempt)
+    {
+        NextAttempt := Attempt + 1;
+    };
+
+define(stagestart, keys(), {UUID, Integer, Integer});
+define(stagefinish, keys(0, 1, 2), {UUID, Integer, Integer, Set});
+
+watch(jobstart, i);
+
+stagestart_INITIAL stagestart(JobId, 0, Attempt) :-
+    jobattempt#insert(JobId, Attempt);
+
+update_job_status_RUNNING job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, null) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.INITIALIZED, JobSpec, JobPlan, _),
+    jobstart(JobId, _);
+
+stagestart_NEXT stagestart(JobId, NextStageNumber, Attempt) :-
+    stagestart(JobId, StageNumber, Attempt),
+    stagefinish#insert(StageId, StageNumber, Attempt, _)
+    {
+        NextStageNumber := StageNumber + 1;
+    };
+
+watch(stagestart, a);
+watch(stagestart, d);
+
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
+    operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit),
+    availablenodes(NodeId);
+
+watch(availablenodes, a);
+watch(availablenodes, i);
+watch(availablenodes, d);
+
+define(availablenodecount, keys(0), {Integer, Integer});
+
+watch(availablenodecount, a);
+watch(availablenodecount, i);
+watch(availablenodecount, d);
+
+availablenodecount(0, count<NodeId>) :-
+    availablenodes(NodeId);
+
+watch(rankedavailablenodes, a);
+watch(rankedavailablenodes, i);
+watch(rankedavailablenodes, d);
+
+watch(operatorlocationcandidates, a);
+watch(operatorlocationcandidates, i);
+watch(operatorlocationcandidates, d);
+
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
+    operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
+
+watch(maxoperatorlocationbenefit, a);
+watch(maxoperatorlocationbenefit, i);
+watch(maxoperatorlocationbenefit, d);
+
+define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer});
+
+watch(attemptoperatorlocationdecision, a);
+watch(attemptoperatorlocationdecision, i);
+watch(attemptoperatorlocationdecision, d);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+    jobattempt#insert(JobId, Attempt),
+    operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
+    maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
+
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+    jobattempt#insert(JobId, Attempt),
+    operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank),
+    rankedavailablenodes(NodeId, NodeRank),
+    availablenodecount(_, NodeCount),
+    NodeRank == CloneRank % NodeCount;
+
+define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
+    operatorclonecount(JobId, OperatorId, NPartitions);
+
+define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
+    expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
+
+watch(operatorclonecountexpansiontotalorder, a);
+watch(operatorclonecountexpansiontotalorder, i);
+watch(operatorclonecountexpansiontotalorder, d);
+
+watch(operatorclonecount, a);
+watch(operatorclonecount, i);
+watch(operatorclonecount, d);
+
+define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
+
+activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
+    stagestart#insert(JobId, StageNumber, Attempt),
+    operatordescriptor(JobId, OperatorId, _, _),
+    activitystage(JobId, OperatorId, ActivityId, StageNumber),
+    jobstage(JobId, StageNumber, StageId),
+    attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt);
+
+watch(activitystart, a);
+
+define(stageletstart, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, set<ActivityInfo>) :-
+    activitystart#insert(JobId, _, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition),
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, _, JobPlan, _)
+    {
+        ActivityInfo := [ActivityId, Partition];
+    };
+
+watch(stageletstart, a);
+watch(stageletstart, i);
+
+define(startmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+startmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+    stageletstart#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityInfoSet),
+    availablenodes(NodeId),
+    ActivityInfoSet.size() != 0
+    {
+        Tuple := [NodeId, ActivityInfoSet];
+    };
+
+startmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+    startmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
+
+watch(startmessage, a);
+watch(startmessage, i);
+
+define(stageletabort, keys(0, 1, 3, 4), {UUID, UUID, JobPlan, String, Integer, Set});
+
+stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet) :-
+    stageletfailure(JobId, StageId, NodeId, Attempt),
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet);
+
+stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
+    stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, _),
+    stageletstart(JobId, StageId, _, NodeIdOther, Attempt, ActivityIdSet),
+    failednodes#insert(NodeId),
+    notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
+
+watch(stageletabort, a);
+watch(stageletabort, i);
+watch(stageletabort, d);
+
+define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageabort(JobId, StageId, Attempt, set<NodeId>) :-
+    stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _);
+
+define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
+
+abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
+    stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+    availablenodes(NodeId)
+    {
+        Tuple := [NodeId, ActivityIdSet];
+    };
+
+abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
+    abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet),
+    TSet.size() != 0;
+
+watch(abortmessage, a);
+watch(abortmessage, i);
+
+define(stageletabortcomplete, keys(), {UUID, UUID, String, Integer});
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+    abortnotify(JobId, StageId, NodeId, Attempt);
+
+stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
+    stageletabort(JobId, StageId, _, NodeId, Attempt, _),
+    notin availablenodes(NodeId);
+
+define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletabortcomplete_agg(JobId, StageId, Attempt, set<NodeId>) :-
+    stageletabortcomplete(JobId, StageId, NodeId, Attempt);
+
+define(abortcomplete, keys(), {UUID, UUID, Integer});
+
+abortcomplete(JobId, StageId, Attempt) :-
+    stageletabortcomplete_agg(JobId, StageId, Attempt, NodeIdSet1),
+    stageabort(JobId, StageId, Attempt, NodeIdSet2),
+    NodeIdSet1.size() == NodeIdSet2.size();
+
+define(stageletcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
+
+stageletcomplete_agg(JobId, StageId, Attempt, set<Statistics>) :-
+    stageletcomplete(JobId, StageId, NodeId, Attempt, Statistics);
+
+stagefinish(JobId, StageNumber, Attempt, SSet) :-
+    startmessage_agg(JobId, StageId, Attempt, _, TSet),
+    stageletcomplete_agg(JobId, StageId, Attempt, SSet),
+    jobstage(JobId, StageNumber, StageId),
+    TSet.size() == SSet.size();
+
+update_job_status_TERMINATED job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.TERMINATED, JobSpec, JobPlan, null) :-
+    job(JobId, edu.uci.ics.hyracks.api.job.JobStatus.RUNNING, JobSpec, JobPlan, _),
+    stagestart#insert(JobId, StageNumber, Attempt),
+    stagefinish(JobId, _, Attempt, SSet),
+    notin jobstage(JobId, StageNumber);
+
+define(jobcleanup_agg, {UUID, Set});
+
+jobcleanup_agg(JobId, set<NodeId>) :-
+    stagestart#insert(JobId, StageNumber, Attempt),
+    stagefinish(JobId, _, Attempt, _),
+    attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
+    notin jobstage(JobId, StageNumber);
+
+jobcleanup(JobId, NodeIdSet) :-
+    jobcleanup_agg(JobId, NodeIdSet);
\ No newline at end of file
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 94bc57a..8505531 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -24,8 +24,8 @@
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.config.CCConfig;
 import edu.uci.ics.hyracks.config.NCConfig;
-import edu.uci.ics.hyracks.controller.ClusterControllerService;
-import edu.uci.ics.hyracks.controller.NodeControllerService;
+import edu.uci.ics.hyracks.controller.clustercontroller.ClusterControllerService;
+import edu.uci.ics.hyracks.controller.nodecontroller.NodeControllerService;
 
 public abstract class AbstractIntegrationTest {
     public static final String NC1_ID = "nc1";
@@ -39,6 +39,7 @@
     public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
         ccConfig.port = 39001;
+        ccConfig.useJOL = true;
         cc = new ClusterControllerService(ccConfig);
         cc.start();
 
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index a3beb11..b7c2e57 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -19,6 +19,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -49,57 +50,85 @@
     public void countOfCountsSingleNC() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+        FileSplit[] splits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/words.txt"))
+        };
+        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
-        PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+            0
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc);
+        PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
-        PartitionConstraint groupPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+            StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+        });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            0
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(0), desc2);
+        PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
-        PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc2);
+        PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
-        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
-        PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            1
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(1), desc2);
+        PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         group2.setPartitionConstraint(groupPartitionConstraint2);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, sorter, 0, group, 0);
 
         IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -116,59 +145,91 @@
     public void countOfCountsMultiNC() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+        FileSplit[] splits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/words.txt"))
+        };
+        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
-        PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+            0
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc);
+        PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID),
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
-        PartitionConstraint groupPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+        });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            0
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(0), desc2);
+        PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID),
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID)
+        });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
-        PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc2);
+        PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
-        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
-        PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            1
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(1), desc2);
+        PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         group2.setPartitionConstraint(groupPartitionConstraint2);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, sorter, 0, group, 0);
 
         IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -185,59 +246,91 @@
     public void countOfCountsExternalSortMultiNC() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+        FileSplit[] splits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/words.txt"))
+        };
+        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 0 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
-        PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
+            0
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc);
+        PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID),
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
-        PartitionConstraint groupPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+        });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            0
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(0), desc2);
+        PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID),
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID)
+        });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 1 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
-        PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc2);
+        PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
-        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
-        PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            1
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(1), desc2);
+        PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         group2.setPartitionConstraint(groupPartitionConstraint2);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, sorter, 0, group, 0);
 
         IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index d619bba..24d45fa 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -19,6 +19,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,19 +37,23 @@
     public void scanPrint01() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/words.txt")),
-                new FileSplit(NC1_ID, new File("data/words.txt")) };
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+        FileSplit[] splits = new FileSplit[] {
+            new FileSplit(NC2_ID, new File("data/words.txt")), new FileSplit(NC1_ID, new File("data/words.txt"))
+        };
+        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
+        });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index cd254e2..0b19cea 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -19,6 +19,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,39 +44,57 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+        };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
-        PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
-        PartitionConstraint sortersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, ordersDesc);
+        PartitionConstraint sortersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter.setPartitionConstraint(sortersPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
         spec.connect(new MToNHashPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                new int[] { 1 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 1 }, new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }), sorter, 0,
-                printer, 0);
-        
+            new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }), new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }), sorter, 0, printer, 0);
+
         runTest(spec);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 593eac1..6770fd8 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -19,8 +19,10 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -42,83 +44,94 @@
 
 public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
     /*
-     * TPCH Customer table:
-     * CREATE TABLE CUSTOMER (
-     * C_CUSTKEY INTEGER NOT NULL,
-     * C_NAME VARCHAR(25) NOT NULL,
-     * C_ADDRESS VARCHAR(40) NOT NULL,
-     * C_NATIONKEY INTEGER NOT NULL,
-     * C_PHONE CHAR(15) NOT NULL,
-     * C_ACCTBAL DECIMAL(15,2) NOT NULL,
-     * C_MKTSEGMENT CHAR(10) NOT NULL,
-     * C_COMMENT VARCHAR(117) NOT NULL
-     * );
-     * TPCH Orders table:
-     * CREATE TABLE ORDERS (
-     * O_ORDERKEY INTEGER NOT NULL,
-     * O_CUSTKEY INTEGER NOT NULL,
-     * O_ORDERSTATUS CHAR(1) NOT NULL,
-     * O_TOTALPRICE DECIMAL(15,2) NOT NULL,
-     * O_ORDERDATE DATE NOT NULL,
-     * O_ORDERPRIORITY CHAR(15) NOT NULL,
-     * O_CLERK CHAR(15) NOT NULL,
-     * O_SHIPPRIORITY INTEGER NOT NULL,
-     * O_COMMENT VARCHAR(79) NOT NULL
-     * );
+     * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
      */
 
     @Test
     public void customerOrderCIDJoin() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+        FileSplit[] custSplits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl"))
+        };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+        FileSplit[] ordersSplits = new FileSplit[] {
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl"))
+        };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
-        PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
-        PartitionConstraint custPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+            "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
-        PartitionConstraint joinPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+            1
+        }, new int[] {
+            0
+        }, new IBinaryHashFunctionFactory[] {
+            StringBinaryHashFunctionFactory.INSTANCE
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, custOrderJoinDesc, 128);
+        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         join.setPartitionConstraint(joinPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -139,67 +152,213 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+        };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+        };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
-        PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
-        PartitionConstraint custPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
-        PartitionConstraint joinPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+            1
+        }, new int[] {
+            0
+        }, new IBinaryHashFunctionFactory[] {
+            StringBinaryHashFunctionFactory.INSTANCE
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, custOrderJoinDesc, 128);
+        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         join.setPartitionConstraint(joinPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDJoinAutoExpand() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+        };
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+        };
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
+
+        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
+        ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+            "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
+        custScanner.setPartitionConstraint(custPartitionConstraint);
+
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+            1
+        }, new int[] {
+            0
+        }, new IBinaryHashFunctionFactory[] {
+            StringBinaryHashFunctionFactory.INSTANCE
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, custOrderJoinDesc, 128);
+        join.setPartitionConstraint(new PartitionCountConstraint(2));
+
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
+        printer.setPartitionConstraint(printerPartitionConstraint);
+
+        IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -214,75 +373,114 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+        };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+        };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
-        PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
-        PartitionConstraint custPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc);
-        ordMat.setPartitionConstraint(new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
+        ordMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        }));
 
         MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc);
-        custMat.setPartitionConstraint(new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
+        custMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        }));
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
-        PartitionConstraint joinPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+            1
+        }, new int[] {
+            0
+        }, new IBinaryHashFunctionFactory[] {
+            StringBinaryHashFunctionFactory.INSTANCE
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, custOrderJoinDesc, 128);
+        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         join.setPartitionConstraint(joinPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
 
         IConnectorDescriptor custPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(custPartConn, custScanner, 0, custMat, 0);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);