Fault-recovery works. Added auto-scheduling strategy.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@23 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
index 858bb65..bbd4b4f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
@@ -27,8 +27,8 @@
     }
 
     @Override
-    public ConstraintType getConstraintType() {
-        return ConstraintType.ABSOLUTE;
+    public LocationConstraintType getConstraintType() {
+        return LocationConstraintType.ABSOLUTE;
     }
 
     public String getLocationId() {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
index c14345f..edfb1c5 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
@@ -28,7 +28,7 @@
     }
 
     @Override
-    public ConstraintType getConstraintType() {
-        return ConstraintType.CHOICE;
+    public LocationConstraintType getConstraintType() {
+        return LocationConstraintType.CHOICE;
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java
new file mode 100644
index 0000000..16b7b24
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+import java.util.Arrays;
+
+public class ExplicitPartitionConstraint extends PartitionConstraint {
+    private static final long serialVersionUID = 1L;
+
+    private final LocationConstraint[] locationConstraints;
+
+    public ExplicitPartitionConstraint(LocationConstraint[] locationConstraints) {
+        this.locationConstraints = locationConstraints;
+    }
+
+    public LocationConstraint[] getLocationConstraints() {
+        return locationConstraints;
+    }
+
+    @Override
+    public String toString() {
+        return Arrays.deepToString(locationConstraints);
+    }
+
+    @Override
+    public PartitionConstraintType getPartitionConstraintType() {
+        return PartitionConstraintType.EXPLICIT;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
index d34b6b2..1228b90 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
@@ -19,10 +19,10 @@
 public abstract class LocationConstraint implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    public enum ConstraintType {
+    public enum LocationConstraintType {
         ABSOLUTE,
         CHOICE
     }
 
-    public abstract ConstraintType getConstraintType();
+    public abstract LocationConstraintType getConstraintType();
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
index c825539..82a6f7b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
@@ -15,22 +15,14 @@
 package edu.uci.ics.hyracks.api.constraints;
 
 import java.io.Serializable;
-import java.util.Arrays;
 
-public class PartitionConstraint implements Serializable {
+public abstract class PartitionConstraint implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    private final LocationConstraint[] locationConstraints;
+    public abstract PartitionConstraintType getPartitionConstraintType();
 
-    public PartitionConstraint(LocationConstraint[] locationConstraints) {
-        this.locationConstraints = locationConstraints;
-    }
-
-    public LocationConstraint[] getLocationConstraints() {
-        return locationConstraints;
-    }
-
-    public String toString() {
-        return Arrays.deepToString(locationConstraints);
+    public enum PartitionConstraintType {
+        EXPLICIT,
+        COUNT
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java
new file mode 100644
index 0000000..b49d880
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public class PartitionCountConstraint extends PartitionConstraint {
+    private static final long serialVersionUID = 1L;
+
+    private final int count;
+
+    public PartitionCountConstraint(int count) {
+        this.count = count;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    @Override
+    public PartitionConstraintType getPartitionConstraintType() {
+        return PartitionConstraintType.COUNT;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
index 89f7566..ea8ede3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -63,43 +63,44 @@
     public boolean dispatch(SelectionKey key) throws IOException {
         if (aborted) {
             recvListener.dataReceived(this);
-        }
-        if (key.isReadable()) {
-            if (LOGGER.isLoggable(Level.FINER)) {
-                LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
-            }
-            int bytesRead = socketChannel.read(readBuffer);
-            if (bytesRead < 0) {
-                recvListener.eos(this);
-                return true;
-            }
-            if (LOGGER.isLoggable(Level.FINER)) {
-                LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
-            }
-            recvListener.dataReceived(this);
-        } else if (key.isWritable()) {
-            synchronized (this) {
-                writeBuffer.flip();
+        } else {
+            if (key.isReadable()) {
                 if (LOGGER.isLoggable(Level.FINER)) {
-                    LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
                 }
-                int bytesWritten = socketChannel.write(writeBuffer);
-                if (bytesWritten < 0) {
+                int bytesRead = socketChannel.read(readBuffer);
+                if (bytesRead < 0) {
+                    recvListener.eos(this);
                     return true;
                 }
                 if (LOGGER.isLoggable(Level.FINER)) {
-                    LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
                 }
-                if (writeBuffer.remaining() <= 0) {
-                    int ops = key.interestOps();
-                    key.interestOps(ops & ~SelectionKey.OP_WRITE);
+                recvListener.dataReceived(this);
+            } else if (key.isWritable()) {
+                synchronized (this) {
+                    writeBuffer.flip();
+                    if (LOGGER.isLoggable(Level.FINER)) {
+                        LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    }
+                    int bytesWritten = socketChannel.write(writeBuffer);
+                    if (bytesWritten < 0) {
+                        return true;
+                    }
+                    if (LOGGER.isLoggable(Level.FINER)) {
+                        LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+                    }
+                    if (writeBuffer.remaining() <= 0) {
+                        int ops = key.interestOps();
+                        key.interestOps(ops & ~SelectionKey.OP_WRITE);
+                    }
+                    writeBuffer.compact();
+                    notifyAll();
                 }
-                writeBuffer.compact();
-                notifyAll();
+            } else {
+                LOGGER.warning("Spurious event triggered: " + key.readyOps());
+                return true;
             }
-        } else {
-            LOGGER.warning("Spurious event triggered: " + key.readyOps());
-            return true;
         }
         return false;
     }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
index 464a95c..55e2962 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
@@ -187,9 +187,7 @@
                 }
             }
         }
-        synchronized (dataListenerThread) {
-            dataListenerThread.pendingAbortConnections.addAll(abortConnections);
-        }
+        dataListenerThread.addPendingAbortConnections(abortConnections);
     }
 
     private final class NetworkFrameWriter implements IFrameWriter {
@@ -271,11 +269,15 @@
         }
 
         synchronized void addSocketChannel(SocketChannel sc) throws IOException {
-            LOGGER.info("Connection received");
             pendingNewSockets.add(sc);
             selector.wakeup();
         }
 
+        synchronized void addPendingAbortConnections(List<IConnectionEntry> abortConnections) {
+            pendingAbortConnections.addAll(abortConnections);
+            selector.wakeup();
+        }
+
         @Override
         public void run() {
             while (!stopped) {
@@ -309,6 +311,7 @@
                                     connections.remove(ce);
                                 }
                             }
+                            pendingAbortConnections.clear();
                         }
                         if (LOGGER.isLoggable(Level.FINE)) {
                             LOGGER.fine("Selector: " + n);
@@ -335,7 +338,7 @@
                             }
                         }
                     }
-                } catch (IOException e) {
+                } catch (Exception e) {
                     e.printStackTrace();
                 }
             }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
index 2cfcb94..694e61a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
@@ -75,8 +75,10 @@
                 LOGGER.finest("NonDeterministicDataReceiveListener: frame received: sender = " + senderIndex);
             }
             SelectionKey key = entry.getSelectionKey();
-            int ops = key.interestOps();
-            key.interestOps(ops & ~SelectionKey.OP_READ);
+            if (key.isValid()) {
+                int ops = key.interestOps();
+                key.interestOps(ops & ~SelectionKey.OP_READ);
+            }
             readyBits.set(senderIndex);
             notifyAll();
             return;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
index 2ae094e..70f0248 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
@@ -131,7 +131,7 @@
                 int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
                         - accessor.getFieldStartOffset(tIndex, fields[i]);
                 System.arraycopy(accessor.getBuffer().array(), fSrcStart, buffer.array(), tupleDataEndOffset
-                        + fSrcSlotsLength + fStartOffset, fLen);
+                        + fTargetSlotsLength + fStartOffset, fLen);
                 fEndOffset += fLen;
                 buffer.putShort(tupleDataEndOffset + i * 2, (short) fEndOffset);
                 fStartOffset = fEndOffset;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 4335b87..9d7862a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.controller.clustercontroller;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -21,21 +22,27 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
 
 import jol.core.Runtime;
 import jol.types.basic.BasicTupleSet;
 import jol.types.basic.Tuple;
 import jol.types.basic.TupleSet;
 import jol.types.exception.BadKeyException;
+import jol.types.exception.UpdateException;
 import jol.types.table.BasicTable;
 import jol.types.table.EventTable;
+import jol.types.table.Function;
 import jol.types.table.Key;
 import jol.types.table.TableName;
 import edu.uci.ics.hyracks.api.comm.Endpoint;
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.Direction;
@@ -54,18 +61,26 @@
 import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
 
 public class JOLJobManagerImpl implements IJobManager {
+    private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
+
     public static final String JOL_SCOPE = "hyrackscc";
 
     private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
 
     private final Runtime jolRuntime;
 
+    private final LinkedBlockingQueue<Runnable> jobQueue;
+
     private final JobTable jobTable;
 
+    private final JobQueueThread jobQueueThread;
+
     private final OperatorDescriptorTable odTable;
 
     private final OperatorLocationTable olTable;
 
+    private final OperatorCloneCountTable ocTable;
+
     private final ConnectorDescriptorTable cdTable;
 
     private final ActivityNodeTable anTable;
@@ -88,17 +103,28 @@
 
     private final AvailableNodesTable availableNodesTable;
 
+    private final RankedAvailableNodesTable rankedAvailableNodesTable;
+
     private final FailedNodesTable failedNodesTable;
 
     private final AbortMessageTable abortMessageTable;
 
     private final AbortNotifyTable abortNotifyTable;
 
+    private final ExpandPartitionCountConstraintTableFunction expandPartitionCountConstraintFunction;
+
+    private final List<String> rankedAvailableNodes;
+
     public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
         this.jolRuntime = jolRuntime;
+        jobQueue = new LinkedBlockingQueue<Runnable>();
+        jobQueueThread = new JobQueueThread();
+        jobQueueThread.start();
+
         this.jobTable = new JobTable(jolRuntime);
         this.odTable = new OperatorDescriptorTable(jolRuntime);
         this.olTable = new OperatorLocationTable(jolRuntime);
+        this.ocTable = new OperatorCloneCountTable(jolRuntime);
         this.cdTable = new ConnectorDescriptorTable(jolRuntime);
         this.anTable = new ActivityNodeTable(jolRuntime);
         this.acTable = new ActivityConnectionTable(jolRuntime);
@@ -110,13 +136,17 @@
         this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
         this.stageletFailureTable = new StageletFailureTable(jolRuntime);
         this.availableNodesTable = new AvailableNodesTable(jolRuntime);
+        this.rankedAvailableNodesTable = new RankedAvailableNodesTable(jolRuntime);
         this.failedNodesTable = new FailedNodesTable(jolRuntime);
         this.abortMessageTable = new AbortMessageTable(jolRuntime);
         this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
+        this.expandPartitionCountConstraintFunction = new ExpandPartitionCountConstraintTableFunction();
+        this.rankedAvailableNodes = new ArrayList<String>();
 
         jolRuntime.catalog().register(jobTable);
         jolRuntime.catalog().register(odTable);
         jolRuntime.catalog().register(olTable);
+        jolRuntime.catalog().register(ocTable);
         jolRuntime.catalog().register(cdTable);
         jolRuntime.catalog().register(anTable);
         jolRuntime.catalog().register(acTable);
@@ -128,9 +158,11 @@
         jolRuntime.catalog().register(stageletCompleteTable);
         jolRuntime.catalog().register(stageletFailureTable);
         jolRuntime.catalog().register(availableNodesTable);
+        jolRuntime.catalog().register(rankedAvailableNodesTable);
         jolRuntime.catalog().register(failedNodesTable);
         jolRuntime.catalog().register(abortMessageTable);
         jolRuntime.catalog().register(abortNotifyTable);
+        jolRuntime.catalog().register(expandPartitionCountConstraintFunction);
 
         jobTable.register(new JobTable.Callback() {
             @Override
@@ -153,83 +185,96 @@
             @SuppressWarnings("unchecked")
             @Override
             public void insertion(TupleSet tuples) {
-                try {
-                    for (Tuple t : tuples) {
-                        Object[] data = t.toArray();
-                        UUID jobId = (UUID) data[0];
-                        UUID stageId = (UUID) data[1];
-                        Integer attempt = (Integer) data[2];
-                        JobPlan plan = (JobPlan) data[3];
-                        Set<List> ts = (Set<List>) data[4];
-                        Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
-                        for (List t2 : ts) {
-                            Object[] t2Data = t2.toArray();
-                            Set<List> activityInfoSet = (Set<List>) t2Data[1];
-                            for (List l : activityInfoSet) {
-                                Object[] lData = l.toArray();
-                                ActivityNodeId aid = (ActivityNodeId) lData[0];
-                                Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
-                                if (opParts == null) {
-                                    opParts = new HashSet<Integer>();
-                                    opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+                for (final Tuple t : tuples) {
+                    jobQueue.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Object[] data = t.toArray();
+                                UUID jobId = (UUID) data[0];
+                                UUID stageId = (UUID) data[1];
+                                Integer attempt = (Integer) data[2];
+                                JobPlan plan = (JobPlan) data[3];
+                                Set<List> ts = (Set<List>) data[4];
+                                Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                    for (List l : activityInfoSet) {
+                                        Object[] lData = l.toArray();
+                                        ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                        Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+                                        if (opParts == null) {
+                                            opParts = new HashSet<Integer>();
+                                            opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+                                        }
+                                        opParts.add((Integer) lData[1]);
+                                    }
                                 }
-                                opParts.add((Integer) lData[1]);
+                                ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+                                    .size()];
+                                int i = 0;
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                                    Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                    for (List l : activityInfoSet) {
+                                        Object[] lData = l.toArray();
+                                        ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                        Set<Integer> aParts = tasks.get(aid);
+                                        if (aParts == null) {
+                                            aParts = new HashSet<Integer>();
+                                            tasks.put(aid, aParts);
+                                        }
+                                        aParts.add((Integer) lData[1]);
+                                    }
+                                    p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
+                                        plan, stageId, attempt, tasks, opPartitions);
+                                }
+                                LOGGER.info("Stage start - Phase 1");
+                                Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+                                    new ClusterControllerService.PortMapMergingAccumulator());
+
+                                ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+                                    .size()];
+                                ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+                                    .size()];
+                                ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+                                    .size()];
+                                i = 0;
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+                                    Set<List> activityInfoSet = (Set<List>) t2Data[1];
+                                    for (List l : activityInfoSet) {
+                                        Object[] lData = l.toArray();
+                                        ActivityNodeId aid = (ActivityNodeId) lData[0];
+                                        Set<Integer> aParts = tasks.get(aid);
+                                        if (aParts == null) {
+                                            aParts = new HashSet<Integer>();
+                                            tasks.put(aid, aParts);
+                                        }
+                                        aParts.add((Integer) lData[1]);
+                                    }
+                                    p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
+                                        plan, stageId, tasks, opPartitions, globalPortMap);
+                                    p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+                                        stageId);
+                                    ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
+                                        stageId);
+                                    ++i;
+                                }
+                                LOGGER.info("Stage start - Phase 2");
+                                ccs.runRemote(p2is, null);
+                                LOGGER.info("Stage start - Phase 3");
+                                ccs.runRemote(p3is, null);
+                                LOGGER.info("Stage start");
+                                ccs.runRemote(ss, null);
+                                LOGGER.info("Stage started");
+                            } catch (Exception e) {
                             }
                         }
-                        ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
-                            .size()];
-                        int i = 0;
-                        for (List t2 : ts) {
-                            Object[] t2Data = t2.toArray();
-                            Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
-                            Set<List> activityInfoSet = (Set<List>) t2Data[1];
-                            for (List l : activityInfoSet) {
-                                Object[] lData = l.toArray();
-                                ActivityNodeId aid = (ActivityNodeId) lData[0];
-                                Set<Integer> aParts = tasks.get(aid);
-                                if (aParts == null) {
-                                    aParts = new HashSet<Integer>();
-                                    tasks.put(aid, aParts);
-                                }
-                                aParts.add((Integer) lData[1]);
-                            }
-                            p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId, plan,
-                                stageId, attempt, tasks, opPartitions);
-                        }
-                        Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
-                            new ClusterControllerService.PortMapMergingAccumulator());
-                        ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
-                            .size()];
-                        ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
-                            .size()];
-                        ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
-                            .size()];
-                        i = 0;
-                        for (List t2 : ts) {
-                            Object[] t2Data = t2.toArray();
-                            Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
-                            Set<List> activityInfoSet = (Set<List>) t2Data[1];
-                            for (List l : activityInfoSet) {
-                                Object[] lData = l.toArray();
-                                ActivityNodeId aid = (ActivityNodeId) lData[0];
-                                Set<Integer> aParts = tasks.get(aid);
-                                if (aParts == null) {
-                                    aParts = new HashSet<Integer>();
-                                    tasks.put(aid, aParts);
-                                }
-                                aParts.add((Integer) lData[1]);
-                            }
-                            p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
-                                stageId, tasks, opPartitions, globalPortMap);
-                            p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId, stageId);
-                            ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
-                            ++i;
-                        }
-                        ccs.runRemote(p2is, null);
-                        ccs.runRemote(p3is, null);
-                        ccs.runRemote(ss, null);
-                    }
-                } catch (Exception e) {
+                    });
                 }
             }
         });
@@ -242,22 +287,32 @@
             @SuppressWarnings("unchecked")
             @Override
             public void insertion(TupleSet tuples) {
-                try {
-                    for (Tuple t : tuples) {
-                        Object[] data = t.toArray();
-                        UUID jobId = (UUID) data[0];
-                        Set<String> ts = (Set<String>) data[1];
-                        ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
-                            .size()];
-                        int i = 0;
-                        for (String n : ts) {
-                            jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+                for (final Tuple t : tuples) {
+                    jobQueue.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Object[] data = t.toArray();
+                                UUID jobId = (UUID) data[0];
+                                Set<String> ts = (Set<String>) data[1];
+                                ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+                                    .size()];
+                                int i = 0;
+                                for (String n : ts) {
+                                    jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+                                }
+                                try {
+                                    ccs.runRemote(jcns, null);
+                                } finally {
+                                    BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable
+                                        .createTuple(jobId));
+                                    jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+                                    jolRuntime.evaluate();
+                                }
+                            } catch (Exception e) {
+                            }
                         }
-                        ccs.runRemote(jcns, null);
-                        BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable.createTuple(jobId));
-                        jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
-                    }
-                } catch (Exception e) {
+                    });
                 }
             }
         });
@@ -271,28 +326,39 @@
             @SuppressWarnings("unchecked")
             @Override
             public void insertion(TupleSet tuples) {
-                try {
-                    for (Tuple t : tuples) {
-                        Object[] data = t.toArray();
-                        UUID jobId = (UUID) data[0];
-                        UUID stageId = (UUID) data[1];
-                        Integer attempt = (Integer) data[2];
-                        Set<List> ts = (Set<List>) data[4];
-                        ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
-                            .size()];
-                        int i = 0;
-                        BasicTupleSet notificationTuples = new BasicTupleSet();
-                        for (List t2 : ts) {
-                            Object[] t2Data = t2.toArray();
-                            String nodeId = (String) t2Data[0];
-                            jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId, attempt);
-                            notificationTuples.add(AbortNotifyTable.createTuple(jobId, stageId, nodeId, attempt));
+                for (final Tuple t : tuples) {
+                    jobQueue.add(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                Object[] data = t.toArray();
+                                UUID jobId = (UUID) data[0];
+                                UUID stageId = (UUID) data[1];
+                                Integer attempt = (Integer) data[2];
+                                Set<List> ts = (Set<List>) data[4];
+                                ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+                                    .size()];
+                                int i = 0;
+                                BasicTupleSet notificationTuples = new BasicTupleSet();
+                                for (List t2 : ts) {
+                                    Object[] t2Data = t2.toArray();
+                                    String nodeId = (String) t2Data[0];
+                                    jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId,
+                                        attempt);
+                                    notificationTuples.add(AbortNotifyTable
+                                        .createTuple(jobId, stageId, nodeId, attempt));
+                                }
+                                try {
+                                    ccs.runRemote(jas, null);
+                                } finally {
+                                    jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples,
+                                        null);
+                                    jolRuntime.evaluate();
+                                }
+                            } catch (Exception e) {
+                            }
                         }
-                        ccs.runRemote(jas, null);
-
-                        jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples, null);
-                    }
-                } catch (Exception e) {
+                    });
                 }
             }
         });
@@ -302,7 +368,7 @@
     }
 
     @Override
-    public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+    public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
         final UUID jobId = UUID.randomUUID();
 
         final JobPlanBuilder builder = new JobPlanBuilder();
@@ -341,15 +407,11 @@
 
         BasicTupleSet odTuples = new BasicTupleSet();
         BasicTupleSet olTuples = new BasicTupleSet();
+        BasicTupleSet ocTuples = new BasicTupleSet();
         for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
             IOperatorDescriptor od = e.getValue();
-            PartitionConstraint pc = od.getPartitionConstraint();
-            LocationConstraint[] locationConstraints = pc.getLocationConstraints();
-            int nPartitions = locationConstraints.length;
+            int nPartitions = addPartitionConstraintTuples(jobId, od, olTuples, ocTuples);
             odTuples.add(OperatorDescriptorTable.createTuple(jobId, nPartitions, od));
-            for (int i = 0; i < locationConstraints.length; ++i) {
-                addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i]);
-            }
             od.contributeTaskGraph(gBuilder);
         }
 
@@ -363,6 +425,7 @@
         jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
         jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
         jolRuntime.schedule(JOL_SCOPE, OperatorLocationTable.TABLE_NAME, olTuples, null);
+        jolRuntime.schedule(JOL_SCOPE, OperatorCloneCountTable.TABLE_NAME, ocTuples, null);
         jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
         jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
         jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
@@ -373,17 +436,39 @@
         return jobId;
     }
 
+    private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
+        BasicTupleSet ocTuples) {
+        PartitionConstraint pc = od.getPartitionConstraint();
+
+        switch (pc.getPartitionConstraintType()) {
+            case COUNT:
+                int count = ((PartitionCountConstraint) pc).getCount();
+                ocTuples.add(OperatorCloneCountTable.createTuple(jobId, od.getOperatorId(), count));
+                return count;
+
+            case EXPLICIT:
+                LocationConstraint[] locationConstraints = ((ExplicitPartitionConstraint) pc).getLocationConstraints();
+                for (int i = 0; i < locationConstraints.length; ++i) {
+                    addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i], 0);
+                }
+                return locationConstraints.length;
+        }
+        throw new IllegalArgumentException();
+    }
+
     private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
-        LocationConstraint locationConstraint) {
+        LocationConstraint locationConstraint, int benefit) {
         switch (locationConstraint.getConstraintType()) {
             case ABSOLUTE:
                 String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
-                olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i));
+                olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i, benefit));
                 break;
 
             case CHOICE:
+                int index = 0;
                 for (LocationConstraint lc : ((ChoiceLocationConstraint) locationConstraint).getChoices()) {
-                    addLocationConstraintTuple(olTuples, jobId, opId, i, lc);
+                    addLocationConstraintTuple(olTuples, jobId, opId, i, lc, benefit - index);
+                    index++;
                 }
         }
     }
@@ -404,7 +489,30 @@
     }
 
     @Override
-    public void notifyNodeFailure(String nodeId) throws Exception {
+    public synchronized void notifyNodeFailure(String nodeId) throws Exception {
+        int len = rankedAvailableNodes.size();
+        int delIndex = -1;
+        for (int i = 0; i < len; ++i) {
+            if (nodeId.equals(rankedAvailableNodes.get(i))) {
+                delIndex = i;
+                break;
+            }
+        }
+        if (delIndex < 0) {
+            return;
+        }
+        BasicTupleSet delRANTuples = new BasicTupleSet();
+        delRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, delIndex));
+
+        BasicTupleSet insRANTuples = new BasicTupleSet();
+        for (int i = delIndex + 1; i < len; ++i) {
+            insRANTuples.add(RankedAvailableNodesTable.createTuple(rankedAvailableNodes.get(i), i - 1));
+        }
+
+        rankedAvailableNodes.remove(delIndex);
+
+        jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, delRANTuples);
+
         BasicTupleSet unavailableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
 
         jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, null, unavailableTuples);
@@ -419,7 +527,7 @@
     }
 
     @Override
-    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
         StageletStatistics statistics) throws Exception {
         BasicTupleSet scTuples = new BasicTupleSet();
         scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
@@ -430,8 +538,7 @@
     }
 
     @Override
-    public synchronized void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId)
-        throws Exception {
+    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
         BasicTupleSet sfTuples = new BasicTupleSet();
         sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
 
@@ -441,7 +548,7 @@
     }
 
     @Override
-    public synchronized void start(UUID jobId) throws Exception {
+    public void start(UUID jobId) throws Exception {
         BasicTupleSet jsTuples = new BasicTupleSet();
         jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
 
@@ -451,7 +558,13 @@
     }
 
     @Override
-    public void registerNode(String nodeId) throws Exception {
+    public synchronized void registerNode(String nodeId) throws Exception {
+        rankedAvailableNodes.add(nodeId);
+        BasicTupleSet insRANTuples = new BasicTupleSet();
+        insRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, rankedAvailableNodes.size() - 1));
+
+        jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, null);
+
         BasicTupleSet availableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
 
         jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
@@ -555,15 +668,37 @@
 
         @SuppressWarnings("unchecked")
         private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, String.class, Integer.class
+            UUID.class, OperatorDescriptorId.class, String.class, Integer.class, Integer.class
         };
 
         public OperatorLocationTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
-        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition) {
-            return new Tuple(jobId, opId, nodeId, partition);
+        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition, int benefit) {
+            return new Tuple(jobId, opId, nodeId, partition, benefit);
+        }
+    }
+
+    /*
+     * declare(operatorclonecount, keys(0, 1), {JobId, ODId, Count})
+     */
+    private static class OperatorCloneCountTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorclonecount");
+
+        private static Key PRIMARY_KEY = new Key();
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, Integer.class
+        };
+
+        public OperatorCloneCountTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, int cloneCount) {
+            return new Tuple(jobId, opId, cloneCount);
         }
     }
 
@@ -821,6 +956,28 @@
     }
 
     /*
+     * declare(rankedavailablenodes, keys(0), {NodeId, Integer})
+     */
+    private static class RankedAvailableNodesTable extends BasicTable {
+        private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "rankedavailablenodes");
+
+        private static Key PRIMARY_KEY = new Key(0);
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            String.class, Integer.class
+        };
+
+        public RankedAvailableNodesTable(Runtime context) {
+            super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+        }
+
+        public static Tuple createTuple(String nodeId, int rank) {
+            return new Tuple(nodeId, rank);
+        }
+    }
+
+    /*
      * declare(failednodes, keys(0), {NodeId})
      */
     private static class FailedNodesTable extends BasicTable {
@@ -881,4 +1038,52 @@
             return new Tuple(jobId, stageId, nodeId, attempt);
         }
     }
+
+    private static class ExpandPartitionCountConstraintTableFunction extends Function {
+        private static final String TABLE_NAME = "expandpartitioncountconstraint";
+
+        @SuppressWarnings("unchecked")
+        private static final Class[] SCHEMA = new Class[] {
+            UUID.class, OperatorDescriptorId.class, Integer.class, Integer.class
+        };
+
+        public ExpandPartitionCountConstraintTableFunction() {
+            super(TABLE_NAME, SCHEMA);
+        }
+
+        @Override
+        public TupleSet insert(TupleSet tuples, TupleSet conflicts) throws UpdateException {
+            TupleSet result = new BasicTupleSet();
+            int counter = 0;
+            for (Tuple t : tuples) {
+                int nPartitions = (Integer) t.value(2);
+                for (int i = 0; i < nPartitions; ++i) {
+                    result.add(new Tuple(t.value(0), t.value(1), i, counter++));
+                }
+            }
+            return result;
+        }
+    }
+
+    private class JobQueueThread extends Thread {
+        public JobQueueThread() {
+            setDaemon(true);
+        }
+
+        public void run() {
+            Runnable r;
+            while (true) {
+                try {
+                    r = jobQueue.take();
+                } catch (InterruptedException e) {
+                    continue;
+                }
+                try {
+                    r.run();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index c202b3d..195fe00 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -288,6 +288,10 @@
                                 PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
                                     Direction.INPUT, spec.getConsumerInputIndex(conn), index);
                                 Endpoint ep = globalPortMap.get(piId);
+                                if (ep == null) {
+                                    LOGGER.info("Got null Endpoint for " + piId);
+                                    throw new NullPointerException();
+                                }
                                 if (LOGGER.isLoggable(Level.FINEST)) {
                                     LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
                                 }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index 73667ed..e16cf30 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -117,13 +117,14 @@
                         + opIId.getOperatorId() + ":" + opIId.getPartition());
                 } catch (Exception e) {
                     e.printStackTrace();
-                    notifyOperatorFailure(opIId);
+                    // notifyOperatorFailure(opIId);
                 }
                 try {
                     hon.run();
                     notifyOperatorCompletion(opIId);
                 } catch (Exception e) {
-                    notifyOperatorFailure(opIId);
+                    e.printStackTrace();
+                    // notifyOperatorFailure(opIId);
                 }
             }
         });
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
index 86c247d..b2dc234 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
@@ -34,8 +34,7 @@
 
 class GroupingHashTable {
     /**
-     * The pointers in the link store 3 int values for each entry in the
-     * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+     * The pointers in the link store 3 int values for each entry in the hashtable: (bufferIdx, tIndex, accumulatorIdx).
      * 
      * @author vinayakb
      */
@@ -81,8 +80,8 @@
     private final FrameTupleAccessor storedKeysAccessor;
 
     GroupingHashTable(HyracksContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
-            ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
-            RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
+        ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
+        RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
         this.ctx = ctx;
         appender = new FrameTupleAppender(ctx);
         buffers = new ArrayList<ByteBuffer>();
@@ -161,7 +160,7 @@
             }
             int saIndex = accumulatorSize++;
             aggregator = accumulators[saIndex] = aggregatorFactory.createAggregator(inRecordDescriptor,
-                    outRecordDescriptor);
+                outRecordDescriptor);
             aggregator.init(accessor, tIndex);
             link.add(sbIndex, stIndex, saIndex);
         }
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
index c1b917d..7ec0eab 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.util.ReflectionUtils;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.controller.IClusterController;
@@ -61,7 +62,7 @@
         private Object value;
 
         public HDFSCustomReader(Map<String, String> jobConfMap, HadoopFileSplit inputSplit,
-                String inputFormatClassName, Reporter reporter) {
+            String inputFormatClassName, Reporter reporter) {
             try {
                 JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap) jobConfMap);
                 FileSystem fileSystem = null;
@@ -74,7 +75,7 @@
                 Class inputFormatClass = Class.forName(inputFormatClassName);
                 InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
                 hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(getFileSplit(inputSplit), conf,
-                        reporter);
+                    reporter);
                 if (hadoopRecordReader instanceof SequenceFileRecordReader) {
                     inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
                     inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
@@ -130,27 +131,27 @@
 
         private FileSplit getFileSplit(HadoopFileSplit hadoopFileSplit) {
             FileSplit fileSplit = new FileSplit(new Path(hadoopFileSplit.getFile()), hadoopFileSplit.getStart(),
-                    hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
+                hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
             return fileSplit;
         }
     }
 
     public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, JobSpecification spec,
-            HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+        HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
         super(spec, splits, recordDescriptor);
         this.inputFormatClassName = inputFormatClassName;
         this.jobConfMap = jobConfMap;
     }
 
     public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, InetSocketAddress nameNode,
-            JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+        JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
         super(spec, null, recordDescriptor);
         this.inputFormatClassName = inputFormatClassName;
         this.jobConfMap = jobConfMap;
     }
 
     public HadoopReadOperatorDescriptor(IClusterController clusterController, Map<String, String> jobConfMap,
-            JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+        JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
         super(spec, null, recordDescriptor);
         HadoopAdapter hadoopAdapter = HadoopAdapter.getInstance(fileSystemURL);
         String inputPathString = jobConfMap.get("mapred.input.dir");
@@ -170,7 +171,7 @@
     }
 
     private void configurePartitionConstraints(IClusterController clusterController,
-            Map<String, List<HadoopFileSplit>> blocksToRead) {
+        Map<String, List<HadoopFileSplit>> blocksToRead) {
         List<LocationConstraint> locationConstraints = new ArrayList<LocationConstraint>();
         Map<String, INodeController> registry = null;
         try {
@@ -223,8 +224,8 @@
             }
         }
 
-        PartitionConstraint partitionConstraint = new PartitionConstraint(locationConstraints
-                .toArray(new LocationConstraint[] {}));
+        PartitionConstraint partitionConstraint = new ExplicitPartitionConstraint(locationConstraints
+            .toArray(new LocationConstraint[] {}));
         this.setPartitionConstraint(partitionConstraint);
     }
 
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index 3813143..8b7bcff 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -112,20 +112,76 @@
 watch(stagestart, a);
 watch(stagestart, d);
 
-define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, String});
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer});
 
 operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
-    operatorlocation(JobId, OperatorId, NodeId, Partition),
-    availablenodes(NodeId)
+    operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit),
+    availablenodes(NodeId);
+
+watch(availablenodes, a);
+watch(availablenodes, i);
+watch(availablenodes, d);
+
+/*
+
+define(rankedavailablenodes_temp, keys(), {String, Integer});
+
+rankedavailablenodes_temp(NodeId, 0) :-
+    availablenodes#insert(NodeId);
+
+rankedavailablenodes_temp(NodeId2, NewRank) :-
+    rankedavailablenodes_temp#insert(NodeId1, Rank),
+    rankedavailablenodes_temp(NodeId2, Rank),
+    NodeId1 < NodeId2
     {
-        Benefit := NodeId;
+        NewRank := Rank + 1;
     };
 
+rankedavailablenodes_temp(NodeId2, NewRank) :-
+    rankedavailablenodes_temp(NodeId1, Rank),
+    rankedavailablenodes_temp#insert(NodeId2, Rank),
+    NodeId1 < NodeId2
+    {
+        NewRank := Rank + 1;
+    };
+
+rankedavailablenodes_temp(NodeId, Rank) :-
+    availablenodes(NodeId),
+    rankedavailablenodes_temp(NodeId, Rank);
+
+rankedavailablenodes_temp(NodeId, NewRank) :-
+    rankedavailablenodes_temp(NodeId, Rank),
+    Rank != 0,
+    notin rankedavailablenodes_temp(_, Rank - 1)
+    {
+        NewRank := Rank - 1;
+    };
+
+define(rankedavailablenodes, keys(0), {String, Integer});
+
+rankedavailablenodes(NodeId, max<Rank>) :-
+    rankedavailablenodes_temp(NodeId, Rank);
+
+*/
+
+define(availablenodecount, keys(0), {Integer, Integer});
+
+watch(availablenodecount, a);
+watch(availablenodecount, i);
+watch(availablenodecount, d);
+
+availablenodecount(0, count<NodeId>) :-
+    availablenodes(NodeId);
+
+watch(rankedavailablenodes, a);
+watch(rankedavailablenodes, i);
+watch(rankedavailablenodes, d);
+
 watch(operatorlocationcandidates, a);
 watch(operatorlocationcandidates, i);
 watch(operatorlocationcandidates, d);
 
-define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, String});
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
 
 maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
     operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
@@ -134,16 +190,81 @@
 watch(maxoperatorlocationbenefit, i);
 watch(maxoperatorlocationbenefit, d);
 
-define(operatorlocationdecision, keys(0, 1, 3), {UUID, OperatorDescriptorId, String, Integer});
+define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer});
 
-watch(operatorlocationdecision, a);
-watch(operatorlocationdecision, i);
-watch(operatorlocationdecision, d);
+watch(attemptoperatorlocationdecision, a);
+watch(attemptoperatorlocationdecision, i);
+watch(attemptoperatorlocationdecision, d);
 
-operatorlocationdecision(JobId, OperatorId, NodeId, Partition) :-
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+    jobattempt#insert(JobId, Attempt),
     operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
     maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
 
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+    jobattempt#insert(JobId, Attempt),
+    operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank),
+    rankedavailablenodes(NodeId, NodeRank),
+    availablenodecount(_, NodeCount),
+    NodeRank == CloneRank % NodeCount;
+
+define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
+    operatorclonecount(JobId, OperatorId, NPartitions);
+
+/*
+define(operatorclonecountexpansion, keys(0, 1), {UUID, OperatorDescriptorId, Integer});
+
+operatorclonecountexpansion(JobId, OperatorId, 0) :-
+    operatorclonecount(JobId, OperatorId, _);
+
+operatorclonecountexpansion(JobId, OperatorId, Partition + 1) :-
+    operatorclonecountexpansion#insert(JobId, OperatorId, Partition),
+    operatorclonecount(JobId, OperatorId, NPartitions),
+    Partition < NPartitions - 1;
+*/
+
+define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+/*
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, 0) :-
+    operatorclonecountexpansion#insert(JobId, OperatorId, Partition);
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, NewRank) :-
+    operatorclonecountexpansiontotalorder#insert(JobId, OperatorId1, Partition1, Rank),
+    operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, Rank),
+    OperatorId1.hashCode() < OperatorId2.hashCode() || (OperatorId1.hashCode() == OperatorId2.hashCode() && Partition1 < Partition2)
+    {
+        NewRank := Rank + 1;
+    };
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, NewRank) :-
+    operatorclonecountexpansiontotalorder(JobId, OperatorId1, Partition1, Rank),
+    operatorclonecountexpansiontotalorder#insert(JobId, OperatorId2, Partition2, Rank),
+    OperatorId1.hashCode() < OperatorId2.hashCode() || (OperatorId1.hashCode() == OperatorId2.hashCode() && Partition1 < Partition2)
+    {
+        NewRank := Rank + 1;
+    };
+*/
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
+    expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
+
+watch(operatorclonecountexpansiontotalorder, a);
+watch(operatorclonecountexpansiontotalorder, i);
+watch(operatorclonecountexpansiontotalorder, d);
+
+watch(operatorclonecount, a);
+watch(operatorclonecount, i);
+watch(operatorclonecount, d);
+
+/*
+watch(operatorclonecountexpansion, a);
+watch(operatorclonecountexpansion, i);
+watch(operatorclonecountexpansion, d);
+*/
+
 define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
 
 activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
@@ -151,7 +272,7 @@
     operatordescriptor(JobId, OperatorId, _, _),
     activitystage(JobId, OperatorId, ActivityId, StageNumber),
     jobstage(JobId, StageNumber, StageId),
-    operatorlocationdecision(JobId, OperatorId, NodeId, Partition);
+    attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt);
 
 watch(activitystart, a);
 
@@ -192,7 +313,7 @@
 stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
     stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, _),
     stageletstart(JobId, StageId, _, NodeIdOther, Attempt, ActivityIdSet),
-    failednodes(NodeId),
+    failednodes#insert(NodeId),
     notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
 
 watch(stageletabort, a);
@@ -202,19 +323,20 @@
 define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
 
 stageabort(JobId, StageId, Attempt, set<NodeId>) :-
-    stageletabort(JobId, StageId, _, NodeId, Attempt, _);
+    stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _);
 
 define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
 
 abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
-    stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+    stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
     availablenodes(NodeId)
     {
         Tuple := [NodeId, ActivityIdSet];
     };
 
 abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
-    abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
+    abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet),
+    TSet.size() != 0;
 
 watch(abortmessage, a);
 watch(abortmessage, i);
@@ -226,7 +348,7 @@
 
 stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
     stageletabort(JobId, StageId, _, NodeId, Attempt, _),
-    failednodes(NodeId);
+    notin availablenodes(NodeId);
 
 define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
 
@@ -262,7 +384,7 @@
 jobcleanup_agg(JobId, set<NodeId>) :-
     stagestart#insert(JobId, StageNumber, Attempt),
     stagefinish(JobId, _, Attempt, _),
-    operatorlocationdecision(JobId, _, NodeId, Attempt),
+    attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
     notin jobstage(JobId, StageNumber);
 
 jobcleanup(JobId, NodeIdSet) :-
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index a3beb11..b7c2e57 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -19,6 +19,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -49,57 +50,85 @@
     public void countOfCountsSingleNC() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+        FileSplit[] splits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/words.txt"))
+        };
+        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
-        PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+            0
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc);
+        PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
-        PartitionConstraint groupPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+            StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+        });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            0
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(0), desc2);
+        PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
-        PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc2);
+        PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
-        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
-        PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            1
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(1), desc2);
+        PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         group2.setPartitionConstraint(groupPartitionConstraint2);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, sorter, 0, group, 0);
 
         IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -116,59 +145,91 @@
     public void countOfCountsMultiNC() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+        FileSplit[] splits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/words.txt"))
+        };
+        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
-        PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+            0
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc);
+        PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID),
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
-        PartitionConstraint groupPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+        });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            0
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(0), desc2);
+        PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID),
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID)
+        });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
-        PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc2);
+        PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
-        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
-        PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            1
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(1), desc2);
+        PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         group2.setPartitionConstraint(groupPartitionConstraint2);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, sorter, 0, group, 0);
 
         IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -185,59 +246,91 @@
     public void countOfCountsExternalSortMultiNC() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+        FileSplit[] splits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/words.txt"))
+        };
+        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
-        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 0 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
-        PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
+            0
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc);
+        PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID),
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter.setPartitionConstraint(sorterPartitionConstraint);
 
         RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
-        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
-        PartitionConstraint groupPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+        });
+        PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            0
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(0), desc2);
+        PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID),
+            new AbsoluteLocationConstraint(NC1_ID),
+            new AbsoluteLocationConstraint(NC2_ID)
+        });
         group.setPartitionConstraint(groupPartitionConstraint);
 
-        ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 1 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
-        PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, desc2);
+        PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter2.setPartitionConstraint(sorterPartitionConstraint2);
 
-        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
-                new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
-        PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+            1
+        }, new IComparatorFactory[] {
+            StringComparatorFactory.INSTANCE
+        }, new SumStringGroupAggregator(1), desc2);
+        PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         group2.setPartitionConstraint(groupPartitionConstraint2);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn1, csvScanner, 0, sorter, 0);
 
         IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
         spec.connect(conn2, sorter, 0, group, 0);
 
         IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(conn3, group, 0, sorter2, 0);
 
         IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index d619bba..24d45fa 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -19,6 +19,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,19 +37,23 @@
     public void scanPrint01() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] splits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/words.txt")),
-                new FileSplit(NC1_ID, new File("data/words.txt")) };
-        RecordDescriptor desc = new RecordDescriptor(
-                new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+        FileSplit[] splits = new FileSplit[] {
+            new FileSplit(NC2_ID, new File("data/words.txt")), new FileSplit(NC1_ID, new File("data/words.txt"))
+        };
+        RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
-        PartitionConstraint csvPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
+        });
         csvScanner.setPartitionConstraint(csvPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index cd254e2..0b19cea 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -19,6 +19,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,39 +44,57 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+        };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
-        PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
-        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
-        PartitionConstraint sortersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, ordersDesc);
+        PartitionConstraint sortersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         sorter.setPartitionConstraint(sortersPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
         spec.connect(new MToNHashPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
-                new int[] { 1 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }),
-                new int[] { 1 }, new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }), sorter, 0,
-                printer, 0);
-        
+            new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }), new int[] {
+            1
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }), sorter, 0, printer, 0);
+
         runTest(spec);
     }
 }
\ No newline at end of file
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 593eac1..7494f11 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -19,6 +19,7 @@
 import org.junit.Test;
 
 import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
 import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
 import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -42,83 +43,94 @@
 
 public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
     /*
-     * TPCH Customer table:
-     * CREATE TABLE CUSTOMER (
-     * C_CUSTKEY INTEGER NOT NULL,
-     * C_NAME VARCHAR(25) NOT NULL,
-     * C_ADDRESS VARCHAR(40) NOT NULL,
-     * C_NATIONKEY INTEGER NOT NULL,
-     * C_PHONE CHAR(15) NOT NULL,
-     * C_ACCTBAL DECIMAL(15,2) NOT NULL,
-     * C_MKTSEGMENT CHAR(10) NOT NULL,
-     * C_COMMENT VARCHAR(117) NOT NULL
-     * );
-     * TPCH Orders table:
-     * CREATE TABLE ORDERS (
-     * O_ORDERKEY INTEGER NOT NULL,
-     * O_CUSTKEY INTEGER NOT NULL,
-     * O_ORDERSTATUS CHAR(1) NOT NULL,
-     * O_TOTALPRICE DECIMAL(15,2) NOT NULL,
-     * O_ORDERDATE DATE NOT NULL,
-     * O_ORDERPRIORITY CHAR(15) NOT NULL,
-     * O_CLERK CHAR(15) NOT NULL,
-     * O_SHIPPRIORITY INTEGER NOT NULL,
-     * O_COMMENT VARCHAR(79) NOT NULL
-     * );
+     * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
      */
 
     @Test
     public void customerOrderCIDJoin() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+        FileSplit[] custSplits = new FileSplit[] {
+            new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl"))
+        };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
-        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+        FileSplit[] ordersSplits = new FileSplit[] {
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl"))
+        };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
-        PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
-        PartitionConstraint custPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+            "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
-        PartitionConstraint joinPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+            1
+        }, new int[] {
+            0
+        }, new IBinaryHashFunctionFactory[] {
+            StringBinaryHashFunctionFactory.INSTANCE
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, custOrderJoinDesc, 128);
+        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         join.setPartitionConstraint(joinPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -139,67 +151,104 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+        };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+        };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
-        PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
-        PartitionConstraint custPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
-        PartitionConstraint joinPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+            1
+        }, new int[] {
+            0
+        }, new IBinaryHashFunctionFactory[] {
+            StringBinaryHashFunctionFactory.INSTANCE
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, custOrderJoinDesc, 128);
+        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         join.setPartitionConstraint(joinPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -214,75 +263,114 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+        };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+        };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
-                StringSerializerDeserializer.INSTANCE });
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE,
+            StringSerializerDeserializer.INSTANCE
+        });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-                '|', "'\"");
-        PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-                "'\"");
-        PartitionConstraint custPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+            "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc);
-        ordMat.setPartitionConstraint(new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
+        ordMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        }));
 
         MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc);
-        custMat.setPartitionConstraint(new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
+        custMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        }));
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
-                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
-                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
-        PartitionConstraint joinPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
-                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+            1
+        }, new int[] {
+            0
+        }, new IBinaryHashFunctionFactory[] {
+            StringBinaryHashFunctionFactory.INSTANCE
+        }, new IBinaryComparatorFactory[] {
+            StringBinaryComparatorFactory.INSTANCE
+        }, custOrderJoinDesc, 128);
+        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+        });
         join.setPartitionConstraint(joinPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
-                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+            new AbsoluteLocationConstraint(NC1_ID)
+        });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 1 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                1
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
 
         IConnectorDescriptor custPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
-                new FieldHashPartitionComputerFactory(new int[] { 0 },
-                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+            new FieldHashPartitionComputerFactory(new int[] {
+                0
+            }, new IBinaryHashFunctionFactory[] {
+                StringBinaryHashFunctionFactory.INSTANCE
+            }));
         spec.connect(custPartConn, custScanner, 0, custMat, 0);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);