Merged in GraceHashJoin. Added tests for GHJ

git-svn-id: https://hyracks.googlecode.com/svn/trunk@27 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
index 5b57ee3..6a3a447 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
@@ -30,8 +30,8 @@
     public boolean supportsPullInterface();
 
     public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition);
+            int partition, int nPartitions);
 
     public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition);
+            int partition, int nPartitions);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
index af60240..367a356 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/ClusterControllerService.java
@@ -90,9 +90,7 @@
         jolRuntime = (Runtime) Runtime.create(jolDebugLevel, System.err);
         jobManager = new JOLJobManagerImpl(this, jolRuntime);
         taskExecutor = Executors.newCachedThreadPool();
-        webServer = new WebServer(new Handler[] {
-            getAdminConsoleHandler(), getApplicationDataHandler()
-        });
+        webServer = new WebServer(new Handler[] { getAdminConsoleHandler(), getApplicationDataHandler() });
         this.timer = new Timer(true);
     }
 
@@ -176,7 +174,7 @@
 
     @Override
     public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
-        StageletStatistics statistics) throws Exception {
+            StageletStatistics statistics) throws Exception {
         jobManager.notifyStageletComplete(jobId, stageId, attempt, nodeId, statistics);
     }
 
@@ -205,7 +203,7 @@
         handler.setHandler(new AbstractHandler() {
             @Override
             public void handle(String target, Request baseRequest, HttpServletRequest request,
-                HttpServletResponse response) throws IOException, ServletException {
+                    HttpServletResponse response) throws IOException, ServletException {
                 if (!"/".equals(target)) {
                     return;
                 }
@@ -242,7 +240,7 @@
         handler.setHandler(new AbstractHandler() {
             @Override
             public void handle(String target, Request baseRequest, HttpServletRequest request,
-                HttpServletResponse response) throws IOException, ServletException {
+                    HttpServletResponse response) throws IOException, ServletException {
             }
         });
         return handler;
@@ -355,7 +353,7 @@
         private Map<OperatorDescriptorId, Set<Integer>> opPartitions;
 
         public Phase1Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId, int attempt,
-            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
+                Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) {
             this.nodeId = nodeId;
             this.jobId = jobId;
             this.plan = plan;
@@ -391,8 +389,8 @@
         private Map<PortInstanceId, Endpoint> globalPortMap;
 
         public Phase2Installer(String nodeId, UUID jobId, JobPlan plan, UUID stageId,
-            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
-            Map<PortInstanceId, Endpoint> globalPortMap) {
+                Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+                Map<PortInstanceId, Endpoint> globalPortMap) {
             this.nodeId = nodeId;
             this.jobId = jobId;
             this.plan = plan;
@@ -530,7 +528,7 @@
     }
 
     static class PortMapMergingAccumulator implements
-        Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
+            Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
         Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
 
         @Override
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 9d7862a..e8b6f14 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -212,7 +212,7 @@
                                     }
                                 }
                                 ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
-                                    .size()];
+                                        .size()];
                                 int i = 0;
                                 for (List t2 : ts) {
                                     Object[] t2Data = t2.toArray();
@@ -229,18 +229,18 @@
                                         aParts.add((Integer) lData[1]);
                                     }
                                     p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
-                                        plan, stageId, attempt, tasks, opPartitions);
+                                            plan, stageId, attempt, tasks, opPartitions);
                                 }
                                 LOGGER.info("Stage start - Phase 1");
                                 Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
-                                    new ClusterControllerService.PortMapMergingAccumulator());
+                                        new ClusterControllerService.PortMapMergingAccumulator());
 
                                 ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
-                                    .size()];
+                                        .size()];
                                 ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
-                                    .size()];
+                                        .size()];
                                 ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
-                                    .size()];
+                                        .size()];
                                 i = 0;
                                 for (List t2 : ts) {
                                     Object[] t2Data = t2.toArray();
@@ -257,11 +257,11 @@
                                         aParts.add((Integer) lData[1]);
                                     }
                                     p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
-                                        plan, stageId, tasks, opPartitions, globalPortMap);
+                                            plan, stageId, tasks, opPartitions, globalPortMap);
                                     p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
-                                        stageId);
+                                            stageId);
                                     ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
-                                        stageId);
+                                            stageId);
                                     ++i;
                                 }
                                 LOGGER.info("Stage start - Phase 2");
@@ -296,7 +296,7 @@
                                 UUID jobId = (UUID) data[0];
                                 Set<String> ts = (Set<String>) data[1];
                                 ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
-                                    .size()];
+                                        .size()];
                                 int i = 0;
                                 for (String n : ts) {
                                     jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
@@ -305,7 +305,7 @@
                                     ccs.runRemote(jcns, null);
                                 } finally {
                                     BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable
-                                        .createTuple(jobId));
+                                            .createTuple(jobId));
                                     jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
                                     jolRuntime.evaluate();
                                 }
@@ -337,22 +337,22 @@
                                 Integer attempt = (Integer) data[2];
                                 Set<List> ts = (Set<List>) data[4];
                                 ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
-                                    .size()];
+                                        .size()];
                                 int i = 0;
                                 BasicTupleSet notificationTuples = new BasicTupleSet();
                                 for (List t2 : ts) {
                                     Object[] t2Data = t2.toArray();
                                     String nodeId = (String) t2Data[0];
                                     jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId,
-                                        attempt);
+                                            attempt);
                                     notificationTuples.add(AbortNotifyTable
-                                        .createTuple(jobId, stageId, nodeId, attempt));
+                                            .createTuple(jobId, stageId, nodeId, attempt));
                                 }
                                 try {
                                     ccs.runRemote(jas, null);
                                 } finally {
                                     jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples,
-                                        null);
+                                            null);
                                     jolRuntime.evaluate();
                                 }
                             } catch (Exception e) {
@@ -387,14 +387,14 @@
             @Override
             public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
                 acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.OUTPUT, operatorOutputIndex,
-                    taskOutputIndex));
+                        taskOutputIndex));
                 builder.addTargetEdge(operatorOutputIndex, task, taskOutputIndex);
             }
 
             @Override
             public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
                 acTuples.add(ActivityConnectionTable.createTuple(jobId, task, Direction.INPUT, operatorInputIndex,
-                    taskInputIndex));
+                        taskInputIndex));
                 builder.addSourceEdge(operatorInputIndex, task, taskInputIndex);
             }
 
@@ -437,7 +437,7 @@
     }
 
     private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
-        BasicTupleSet ocTuples) {
+            BasicTupleSet ocTuples) {
         PartitionConstraint pc = od.getPartitionConstraint();
 
         switch (pc.getPartitionConstraintType()) {
@@ -457,7 +457,7 @@
     }
 
     private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
-        LocationConstraint locationConstraint, int benefit) {
+            LocationConstraint locationConstraint, int benefit) {
         switch (locationConstraint.getConstraintType()) {
             case ABSOLUTE:
                 String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
@@ -527,8 +527,8 @@
     }
 
     @Override
-    public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
-        StageletStatistics statistics) throws Exception {
+    public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+            StageletStatistics statistics) throws Exception {
         BasicTupleSet scTuples = new BasicTupleSet();
         scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
 
@@ -538,7 +538,8 @@
     }
 
     @Override
-    public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
+    public synchronized void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId)
+            throws Exception {
         BasicTupleSet sfTuples = new BasicTupleSet();
         sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
 
@@ -598,9 +599,8 @@
         private static Key PRIMARY_KEY = new Key(0);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, JobStatus.class, JobSpecification.class, JobPlan.class, Set.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, JobStatus.class, JobSpecification.class,
+                JobPlan.class, Set.class };
 
         public JobTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -645,9 +645,8 @@
         private static Key PRIMARY_KEY = new Key(0, 1);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, Integer.class, IOperatorDescriptor.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+                IOperatorDescriptor.class };
 
         public OperatorDescriptorTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -667,9 +666,8 @@
         private static Key PRIMARY_KEY = new Key();
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, String.class, Integer.class, Integer.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, String.class,
+                Integer.class, Integer.class };
 
         public OperatorLocationTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -689,9 +687,7 @@
         private static Key PRIMARY_KEY = new Key();
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, Integer.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class };
 
         public OperatorCloneCountTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -711,15 +707,9 @@
         private static Key PRIMARY_KEY = new Key(0, 1);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class,
-            ConnectorDescriptorId.class,
-            OperatorDescriptorId.class,
-            Integer.class,
-            OperatorDescriptorId.class,
-            Integer.class,
-            IConnectorDescriptor.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, ConnectorDescriptorId.class,
+                OperatorDescriptorId.class, Integer.class, OperatorDescriptorId.class, Integer.class,
+                IConnectorDescriptor.class };
 
         public ConnectorDescriptorTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -730,8 +720,8 @@
             int srcPort = jobSpec.getProducerOutputIndex(conn);
             IOperatorDescriptor destOD = jobSpec.getConsumer(conn);
             int destPort = jobSpec.getConsumerInputIndex(conn);
-            Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort, destOD
-                .getOperatorId(), destPort, conn);
+            Tuple cdTuple = new Tuple(jobId, conn.getConnectorId(), srcOD.getOperatorId(), srcPort,
+                    destOD.getOperatorId(), destPort, conn);
             return cdTuple;
         }
     }
@@ -745,9 +735,8 @@
         private static Key PRIMARY_KEY = new Key(0, 1, 2);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, ActivityNodeId.class, IActivityNode.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class,
+                ActivityNodeId.class, IActivityNode.class };
 
         public ActivityNodeTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -755,7 +744,7 @@
 
         static Tuple createTuple(UUID jobId, IActivityNode aNode) {
             return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), aNode.getActivityNodeId(),
-                aNode);
+                    aNode);
         }
     }
 
@@ -768,17 +757,16 @@
         private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, Integer.class, Direction.class, ActivityNodeId.class, Integer.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+                Direction.class, ActivityNodeId.class, Integer.class };
 
         public ActivityConnectionTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
         static Tuple createTuple(UUID jobId, IActivityNode aNode, Direction direction, int odPort, int activityPort) {
-            return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction, aNode
-                .getActivityNodeId(), activityPort);
+            return new Tuple(jobId, aNode.getActivityNodeId().getOperatorDescriptorId(), odPort, direction,
+                    aNode.getActivityNodeId(), activityPort);
         }
     }
 
@@ -791,13 +779,8 @@
         private static Key PRIMARY_KEY = new Key(0, 1, 2, 3, 4);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class,
-            OperatorDescriptorId.class,
-            ActivityNodeId.class,
-            OperatorDescriptorId.class,
-            ActivityNodeId.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class,
+                ActivityNodeId.class, OperatorDescriptorId.class, ActivityNodeId.class };
 
         public ActivityBlockedTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -819,9 +802,7 @@
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobstart");
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, Long.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, Long.class };
 
         public JobStartTable() {
             super(TABLE_NAME, SCHEMA);
@@ -841,9 +822,8 @@
         private static Key PRIMARY_KEY = new Key(0, 1, 2);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
+                Set.class };
 
         public StartMessageTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -859,9 +839,7 @@
         private static Key PRIMARY_KEY = new Key(0);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, Set.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, Set.class };
 
         public JobCleanUpTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -875,9 +853,7 @@
         private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "jobcleanupcomplete");
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class };
 
         public JobCleanUpCompleteTable() {
             super(TABLE_NAME, SCHEMA);
@@ -897,16 +873,15 @@
         private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, UUID.class, String.class, Integer.class, StageletStatistics.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class,
+                StageletStatistics.class };
 
         public StageletCompleteTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
         }
 
         public static Tuple createTuple(UUID jobId, UUID stageId, String nodeId, int attempt,
-            StageletStatistics statistics) {
+                StageletStatistics statistics) {
             return new Tuple(jobId, stageId, nodeId, attempt, statistics);
         }
     }
@@ -920,9 +895,7 @@
         private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, UUID.class, String.class, Integer.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class };
 
         public StageletFailureTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -942,9 +915,7 @@
         private static Key PRIMARY_KEY = new Key(0);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            String.class
-        };
+        private static final Class[] SCHEMA = new Class[] { String.class };
 
         public AvailableNodesTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -964,9 +935,7 @@
         private static Key PRIMARY_KEY = new Key(0);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            String.class, Integer.class
-        };
+        private static final Class[] SCHEMA = new Class[] { String.class, Integer.class };
 
         public RankedAvailableNodesTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -986,9 +955,7 @@
         private static Key PRIMARY_KEY = new Key(0);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            String.class
-        };
+        private static final Class[] SCHEMA = new Class[] { String.class };
 
         public FailedNodesTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -1008,9 +975,8 @@
         private static Key PRIMARY_KEY = new Key(0, 1, 2);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, UUID.class, Integer.class, JobPlan.class, Set.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, Integer.class, JobPlan.class,
+                Set.class };
 
         public AbortMessageTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -1026,9 +992,7 @@
         private static Key PRIMARY_KEY = new Key(0, 1, 2, 3);
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, UUID.class, String.class, Integer.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, UUID.class, String.class, Integer.class };
 
         public AbortNotifyTable(Runtime context) {
             super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
@@ -1043,9 +1007,8 @@
         private static final String TABLE_NAME = "expandpartitioncountconstraint";
 
         @SuppressWarnings("unchecked")
-        private static final Class[] SCHEMA = new Class[] {
-            UUID.class, OperatorDescriptorId.class, Integer.class, Integer.class
-        };
+        private static final Class[] SCHEMA = new Class[] { UUID.class, OperatorDescriptorId.class, Integer.class,
+                Integer.class };
 
         public ExpandPartitionCountConstraintTableFunction() {
             super(TABLE_NAME, SCHEMA);
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index 195fe00..758a2a0 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -151,7 +151,7 @@
         Matcher m = pattern.matcher(ipaddrStr);
         if (!m.matches()) {
             throw new Exception(MessageFormat.format(
-                "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
+                    "Connection Manager IP Address String %s does is not a valid IP Address.", ipaddrStr));
         }
         byte[] ipBytes = new byte[4];
         ipBytes[0] = (byte) Integer.parseInt(m.group(1));
@@ -163,7 +163,8 @@
 
     @Override
     public Map<PortInstanceId, Endpoint> initializeJobletPhase1(UUID jobId, JobPlan plan, UUID stageId, int attempt,
-        Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions) throws Exception {
+            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions)
+            throws Exception {
         LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 1");
 
         final Joblet joblet = getLocalJoblet(jobId);
@@ -184,7 +185,8 @@
             IOperatorDescriptor op = han.getOwner();
             List<IConnectorDescriptor> inputs = plan.getTaskInputs(hanId);
             for (int i : tasks.get(hanId)) {
-                IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i);
+                IOperatorNodePushable hon = han.createPushRuntime(ctx, plan, joblet.getEnvironment(op, i), i,
+                        opPartitions.get(op.getOperatorId()).size());
                 OperatorRunnable or = new OperatorRunnable(ctx, hon);
                 stagelet.setOperator(op.getOperatorId(), i, or);
                 if (inputs != null) {
@@ -194,21 +196,21 @@
                         }
                         IConnectorDescriptor conn = inputs.get(j);
                         OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
-                            .getOperatorId();
+                                .getOperatorId();
                         OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
-                            .getOperatorId();
+                                .getOperatorId();
                         Endpoint endpoint = new Endpoint(connectionManager.getNetworkAddress(), i);
                         endpointList.add(endpoint);
                         DemuxDataReceiveListenerFactory drlf = new DemuxDataReceiveListenerFactory(ctx, jobId, stageId);
                         connectionManager.acceptConnection(endpoint.getEndpointId(), drlf);
                         PortInstanceId piId = new PortInstanceId(op.getOperatorId(), Direction.INPUT, plan
-                            .getTaskInputMap().get(hanId).get(j), i);
+                                .getTaskInputMap().get(hanId).get(j), i);
                         if (LOGGER.isLoggable(Level.FINEST)) {
                             LOGGER.finest("Created endpoint " + piId + " -> " + endpoint);
                         }
                         portMap.put(piId, endpoint);
                         IFrameReader reader = createReader(conn, drlf, i, plan, stagelet, opPartitions
-                            .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
+                                .get(producerOpId).size(), opPartitions.get(consumerOpId).size());
                         or.setFrameReader(reader);
                     }
                 }
@@ -222,10 +224,10 @@
     }
 
     private IFrameReader createReader(final IConnectorDescriptor conn, IConnectionDemultiplexer demux,
-        final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
-        throws HyracksDataException {
+            final int receiverIndex, JobPlan plan, final Stagelet stagelet, int nProducerCount, int nConsumerCount)
+            throws HyracksDataException {
         final IFrameReader reader = conn.createReceiveSideReader(ctx, plan, demux, receiverIndex, nProducerCount,
-            nConsumerCount);
+                nConsumerCount);
 
         return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameReader() {
             private int frameCount;
@@ -248,17 +250,18 @@
             @Override
             public void close() throws HyracksDataException {
                 reader.close();
-                stagelet.getStatistics().getStatisticsMap().put(
-                    "framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
-                    String.valueOf(frameCount));
+                stagelet.getStatistics()
+                        .getStatisticsMap()
+                        .put("framecount." + conn.getConnectorId().getId() + ".receiver." + receiverIndex,
+                                String.valueOf(frameCount));
             }
         } : reader;
     }
 
     @Override
     public void initializeJobletPhase2(UUID jobId, final JobPlan plan, UUID stageId,
-        Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
-        final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
+            Map<ActivityNodeId, Set<Integer>> tasks, Map<OperatorDescriptorId, Set<Integer>> opPartitions,
+            final Map<PortInstanceId, Endpoint> globalPortMap) throws Exception {
         LOGGER.log(Level.INFO, String.valueOf(jobId) + "[" + id + ":" + stageId + "]: Initializing Joblet Phase 2");
         final Joblet ji = getLocalJoblet(jobId);
         Stagelet si = (Stagelet) ji.getStagelet(stageId);
@@ -278,15 +281,15 @@
                     for (int j = 0; j < outputs.size(); ++j) {
                         final IConnectorDescriptor conn = outputs.get(j);
                         OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
-                            .getOperatorId();
+                                .getOperatorId();
                         OperatorDescriptorId consumerOpId = plan.getJobSpecification().getConsumer(conn)
-                            .getOperatorId();
+                                .getOperatorId();
                         final int senderIndex = i;
                         IEndpointDataWriterFactory edwFactory = new IEndpointDataWriterFactory() {
                             @Override
                             public IFrameWriter createFrameWriter(int index) throws HyracksDataException {
                                 PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
-                                    Direction.INPUT, spec.getConsumerInputIndex(conn), index);
+                                        Direction.INPUT, spec.getConsumerInputIndex(conn), index);
                                 Endpoint ep = globalPortMap.get(piId);
                                 if (ep == null) {
                                     LOGGER.info("Got null Endpoint for " + piId);
@@ -295,12 +298,12 @@
                                 if (LOGGER.isLoggable(Level.FINEST)) {
                                     LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
                                 }
-                                return createWriter(connectionManager.connect(ep.getNetworkAddress(), ep
-                                    .getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
+                                return createWriter(connectionManager.connect(ep.getNetworkAddress(),
+                                        ep.getEndpointId(), senderIndex), plan, conn, senderIndex, index, stagelet);
                             }
                         };
-                        or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i, opPartitions.get(
-                            producerOpId).size(), opPartitions.get(consumerOpId).size()));
+                        or.setFrameWriter(j, conn.createSendSideWriter(ctx, plan, edwFactory, i,
+                                opPartitions.get(producerOpId).size(), opPartitions.get(consumerOpId).size()));
                     }
                 }
                 stagelet.installRunnable(new OperatorInstanceId(op.getOperatorId(), i));
@@ -309,7 +312,7 @@
     }
 
     private IFrameWriter createWriter(final IFrameWriter writer, JobPlan plan, final IConnectorDescriptor conn,
-        final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
+            final int senderIndex, final int receiverIndex, final Stagelet stagelet) throws HyracksDataException {
         return plan.getJobFlags().contains(JobFlag.COLLECT_FRAME_COUNTS) ? new IFrameWriter() {
             private int frameCount;
 
@@ -328,9 +331,10 @@
             @Override
             public void close() throws HyracksDataException {
                 writer.close();
-                stagelet.getStatistics().getStatisticsMap().put(
-                    "framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "." + receiverIndex,
-                    String.valueOf(frameCount));
+                stagelet.getStatistics()
+                        .getStatisticsMap()
+                        .put("framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "."
+                                + receiverIndex, String.valueOf(frameCount));
             }
         } : writer;
     }
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index e16cf30..32793eb 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -113,8 +113,8 @@
                     return;
                 }
                 try {
-                    LOGGER.log(Level.INFO, "Starting runnable for operator: " + joblet.getJobId() + ":" + stageId + ":"
-                        + opIId.getOperatorId() + ":" + opIId.getPartition());
+                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                            + opIId.getPartition() + ": STARTING");
                 } catch (Exception e) {
                     e.printStackTrace();
                     // notifyOperatorFailure(opIId);
@@ -126,6 +126,13 @@
                     e.printStackTrace();
                     // notifyOperatorFailure(opIId);
                 }
+                try {
+                    LOGGER.log(Level.INFO, joblet.getJobId() + ":" + stageId + ":" + opIId.getOperatorId() + ":"
+                            + opIId.getPartition() + ": TERMINATED");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    // notifyOperatorFailure(opIId);
+                }
             }
         });
     }
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
index e5052b2..457a44c 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/ExternalSortOperatorDescriptor.java
@@ -92,13 +92,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             return null;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition) {
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -306,13 +306,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             return null;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition) {
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
index 45e882a..1c7e6c7 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/InMemorySortOperatorDescriptor.java
@@ -77,13 +77,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             return null;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition) {
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -240,13 +240,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             return null;
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition) {
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             IOperatorNodePushable op = new IOperatorNodePushable() {
                 private IFrameWriter writer;
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
index faad82c..807e890 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MapperOperatorDescriptor.java
@@ -71,13 +71,13 @@
 
     @Override
     public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new MapperOperator(), plan, getActivityNodeId());
     }
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
index 5e52989..9f06d7b 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/MaterializingOperatorDescriptor.java
@@ -63,13 +63,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             throw new UnsupportedOperationException();
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition) {
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             return new IOperatorNodePushable() {
                 private FileChannel out;
                 private int frameCount;
@@ -141,13 +141,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             throw new UnsupportedOperationException();
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition) {
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             return new IOperatorNodePushable() {
                 private IFrameWriter writer;
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
index fd32245..a07a2f9 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/PrinterOperatorDescriptor.java
@@ -59,13 +59,13 @@
 
     @Override
     public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(), plan, getActivityNodeId());
     }
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
index feb4d2f..08ec2bb 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/SplitVectorOperatorDescriptor.java
@@ -50,13 +50,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             throw new UnsupportedOperationException();
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, final IOperatorEnvironment env,
-                int partition) {
+        public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
                 private ArrayList<Object[]> buffer;
 
@@ -105,13 +105,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             throw new UnsupportedOperationException();
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, final IOperatorEnvironment env,
-                int partition) {
+        public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             IOpenableDataWriterOperator op = new IOpenableDataWriterOperator() {
                 private IOpenableDataWriter<Object[]> writer;
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
index 82e805b..a4571ee 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileScanOperatorDescriptor.java
@@ -102,13 +102,13 @@
 
     @Override
     public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), plan, getActivityNodeId());
     }
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
index 5bf6a86..d04df29 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/file/AbstractFileWriteOperatorDescriptor.java
@@ -73,14 +73,14 @@
     protected FileSplit[] splits;
 
     public FileSplit[] getSplits() {
-		return splits;
-	}
+        return splits;
+    }
 
-	public void setSplits(FileSplit[] splits) {
-		this.splits = splits;
-	}
+    public void setSplits(FileSplit[] splits) {
+        this.splits = splits;
+    }
 
-	public AbstractFileWriteOperatorDescriptor(JobSpecification spec, FileSplit[] splits) {
+    public AbstractFileWriteOperatorDescriptor(JobSpecification spec, FileSplit[] splits) {
         super(spec, 1, 0);
         this.splits = splits;
     }
@@ -89,13 +89,13 @@
 
     @Override
     public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition), plan, getActivityNodeId());
     }
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
index 2fc6b63..664c84d 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/HashGroupOperatorDescriptor.java
@@ -74,13 +74,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             throw new UnsupportedOperationException();
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition) {
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             final RecordDescriptor rd0 = plan.getJobSpecification()
                     .getOperatorInputRecordDescriptor(getOperatorId(), 0);
             final FrameTupleAccessor accessor = new FrameTupleAccessor(ctx, rd0);
@@ -135,13 +135,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             throw new UnsupportedOperationException();
         }
 
         @Override
-        public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, final IOperatorEnvironment env,
-                int partition) {
+        public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan,
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             return new IOperatorNodePushable() {
                 private IFrameWriter writer;
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
index 0232d73..e7e04a5 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/PreclusteredGroupOperatorDescriptor.java
@@ -44,13 +44,13 @@
 
     @Override
     public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         IComparator[] comparators = new IComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createComparator();
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
index 5ee84e47..90502b0 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/AbstractHadoopFileScanOperatorDescriptor.java
@@ -14,9 +14,9 @@
  */
 package edu.uci.ics.hyracks.coreops.hadoop;
 
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
 
 import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
@@ -143,13 +143,13 @@
 
     @Override
     public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new FileScanOperator(partition), plan, getActivityNodeId());
     }
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
index 5385e44..c144365 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopMapperOperatorDescriptor.java
@@ -33,7 +33,6 @@
 import edu.uci.ics.hyracks.context.HyracksContext;
 import edu.uci.ics.hyracks.coreops.base.IOpenableDataWriterOperator;
 import edu.uci.ics.hyracks.coreops.util.DeserializedOperatorNodePushable;
-import edu.uci.ics.hyracks.hadoop.util.ClasspathBasedHadoopClassFactory;
 import edu.uci.ics.hyracks.hadoop.util.DatatypeHelper;
 import edu.uci.ics.hyracks.hadoop.util.IHadoopClassFactory;
 
@@ -91,7 +90,7 @@
     private static final long serialVersionUID = 1L;
     private static final String mapClassNameKey = "mapred.mapper.class";
     private Class<? extends Mapper> mapperClass;
-   
+
     public HadoopMapperOperatorDescriptor(JobSpecification spec, Class<? extends Mapper> mapperClass,
             RecordDescriptor recordDescriptor, JobConf jobConf) {
         super(spec, recordDescriptor, jobConf, null);
@@ -102,20 +101,24 @@
         super(spec, null, jobConf, hadoopClassFactory);
     }
 
-    public RecordDescriptor getRecordDescriptor(JobConf conf){
-    	RecordDescriptor recordDescriptor = null;
-    	String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName();
-		String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
-		try{
-			if(getHadoopClassFactory() == null){
-				recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class < ? extends Writable>) Class.forName(mapOutputKeyClassName),(Class < ? extends Writable>) Class.forName(mapOutputValueClassName));
-			}else{
-				recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class < ? extends Writable>) getHadoopClassFactory().loadClass(mapOutputKeyClassName),(Class < ? extends Writable>) getHadoopClassFactory().loadClass(mapOutputValueClassName));
-			}
-		}catch(Exception e){
-			e.printStackTrace();
-		}
-		return recordDescriptor;
+    public RecordDescriptor getRecordDescriptor(JobConf conf) {
+        RecordDescriptor recordDescriptor = null;
+        String mapOutputKeyClassName = conf.getMapOutputKeyClass().getName();
+        String mapOutputValueClassName = conf.getMapOutputValueClass().getName();
+        try {
+            if (getHadoopClassFactory() == null) {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) Class.forName(mapOutputKeyClassName),
+                        (Class<? extends Writable>) Class.forName(mapOutputValueClassName));
+            } else {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputKeyClassName),
+                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(mapOutputValueClassName));
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return recordDescriptor;
     }
 
     private Mapper<K1, V1, K2, V2> createMapper() throws Exception {
@@ -123,21 +126,21 @@
             return mapperClass.newInstance();
         } else {
             String mapperClassName = super.getJobConfMap().get(mapClassNameKey);
-        	Object mapper = getHadoopClassFactory().createMapper(mapperClassName);
+            Object mapper = getHadoopClassFactory().createMapper(mapperClassName);
             mapperClass = (Class<? extends Mapper>) mapper.getClass();
-            return (Mapper)mapper;
+            return (Mapper) mapper;
         }
     }
 
     @Override
     public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         return new DeserializedOperatorNodePushable(ctx, new MapperOperator(), plan, getActivityNodeId());
     }
 
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
index 7881d45..e7bfa31 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReducerOperatorDescriptor.java
@@ -22,11 +22,11 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Counters.Counter;
 
 import edu.uci.ics.hyracks.api.dataflow.IDataReader;
 import edu.uci.ics.hyracks.api.dataflow.IDataWriter;
@@ -183,7 +183,6 @@
     private static final String reducerClassKey = "mapred.reducer.class";
     private static final String comparatorClassKey = "mapred.output.value.groupfn.class";
     private IComparatorFactory comparatorFactory;
-    
 
     public HadoopReducerOperatorDescriptor(JobSpecification spec, IComparatorFactory comparatorFactory,
             Class<? extends Reducer> reducerClass, RecordDescriptor recordDescriptor, JobConf jobConf) {
@@ -192,49 +191,49 @@
         this.reducerClass = reducerClass;
     }
 
-    
     public HadoopReducerOperatorDescriptor(JobSpecification spec, JobConf conf, IHadoopClassFactory classFactory) {
-        super(spec, null , conf, classFactory);
+        super(spec, null, conf, classFactory);
     }
-    
+
     private Reducer<K2, V2, K3, V3> createReducer() throws Exception {
-        if(reducerClass != null){
-        	return reducerClass.newInstance();
-        }else{
-        	Object reducer = getHadoopClassFactory().createReducer(getJobConfMap().get(reducerClassKey));
+        if (reducerClass != null) {
+            return reducerClass.newInstance();
+        } else {
+            Object reducer = getHadoopClassFactory().createReducer(getJobConfMap().get(reducerClassKey));
             reducerClass = (Class<? extends Reducer>) reducer.getClass();
-            return (Reducer)reducer;	
+            return (Reducer) reducer;
         }
     }
 
     @Override
     public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         throw new UnsupportedOperationException();
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-            int partition) {
+            int partition, int nPartitions) {
         try {
-        	if(this.comparatorFactory == null){
-	            String comparatorClassName  = getJobConfMap().get(comparatorClassKey);
-	            RawComparator rawComparator = null;
-	            if(comparatorClassName != null){
-		            Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
-				    this.comparatorFactory = new KeyComparatorFactory(comparatorClazz);
-		            
-			    }else{
-		        	String mapOutputKeyClass = getJobConfMap().get("mapred.mapoutput.key.class");
-		        	if(getHadoopClassFactory() != null){
-		        		rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass));
-		        	}else{
-		        		rawComparator = WritableComparator.get((Class<? extends WritableComparable>)Class.forName(mapOutputKeyClass));
-		        	}
-		        	this.comparatorFactory = new WritableComparingComparatorFactory(rawComparator.getClass());
-		        }
-        	}
-	        IOpenableDataWriterOperator op = new PreclusteredGroupOperator(new int[] { 0 },
+            if (this.comparatorFactory == null) {
+                String comparatorClassName = getJobConfMap().get(comparatorClassKey);
+                RawComparator rawComparator = null;
+                if (comparatorClassName != null) {
+                    Class comparatorClazz = getHadoopClassFactory().loadClass(comparatorClassName);
+                    this.comparatorFactory = new KeyComparatorFactory(comparatorClazz);
+
+                } else {
+                    String mapOutputKeyClass = getJobConfMap().get("mapred.mapoutput.key.class");
+                    if (getHadoopClassFactory() != null) {
+                        rawComparator = WritableComparator.get(getHadoopClassFactory().loadClass(mapOutputKeyClass));
+                    } else {
+                        rawComparator = WritableComparator.get((Class<? extends WritableComparable>) Class
+                                .forName(mapOutputKeyClass));
+                    }
+                    this.comparatorFactory = new WritableComparingComparatorFactory(rawComparator.getClass());
+                }
+            }
+            IOpenableDataWriterOperator op = new PreclusteredGroupOperator(new int[] { 0 },
                     new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
             return new DeserializedOperatorNodePushable(ctx, op, plan, getActivityNodeId());
         } catch (Exception e) {
@@ -260,21 +259,25 @@
         return true;
     }
 
-	@Override
-	public RecordDescriptor getRecordDescriptor(JobConf conf) {
-		String outputKeyClassName = conf.get("mapred.output.key.class");
-		String outputValueClassName = conf.get("mapred.output.value.class");
-		RecordDescriptor recordDescriptor = null;
-		try{
-			if(getHadoopClassFactory() == null){
-				recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>)Class.forName(outputKeyClassName), (Class<? extends Writable>)Class.forName(outputValueClassName));
-			}else{
-				recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>)getHadoopClassFactory().loadClass(outputKeyClassName), (Class<? extends Writable>)getHadoopClassFactory().loadClass(outputValueClassName));
-			}
-		}catch(Exception e){
-			e.printStackTrace();
-			return null;
-		}	
-		return recordDescriptor;
-	}
+    @Override
+    public RecordDescriptor getRecordDescriptor(JobConf conf) {
+        String outputKeyClassName = conf.get("mapred.output.key.class");
+        String outputValueClassName = conf.get("mapred.output.value.class");
+        RecordDescriptor recordDescriptor = null;
+        try {
+            if (getHadoopClassFactory() == null) {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) Class.forName(outputKeyClassName),
+                        (Class<? extends Writable>) Class.forName(outputValueClassName));
+            } else {
+                recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
+                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputKeyClassName),
+                        (Class<? extends Writable>) getHadoopClassFactory().loadClass(outputValueClassName));
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+        return recordDescriptor;
+    }
 }
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
new file mode 100644
index 0000000..79d88af
--- /dev/null
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/GraceHashJoinOperatorDescriptor.java
@@ -0,0 +1,386 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.coreops.join;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.coreops.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
+
+public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final String SMALLRELATION = "RelR";
+    private static final String LARGERELATION = "RelS";
+    private static final String NUM_PARTITION = "NUMBER_PARTITIONS";
+    private static final long serialVersionUID = 1L;
+    private final int[] keys0;
+    private final int[] keys1;
+    private final int inputsize0;
+    private final int recordsPerFrame;
+    private final int memsize;
+    private final double factor;
+    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+    private final IBinaryComparatorFactory[] comparatorFactories;
+
+    private int numReadI1 = 0;
+    private int numWrite = 0;
+    private int numReadI2 = 0;
+
+    public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.recordsPerFrame = recordsPerFrame;
+        this.factor = factor;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    @Override
+    public void contributeTaskGraph(IActivityGraphBuilder builder) {
+        HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
+        HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
+        JoinActivityNode join = new JoinActivityNode();
+
+        builder.addTask(rpart);
+        builder.addSourceEdge(0, rpart, 0);
+
+        builder.addTask(spart);
+        builder.addSourceEdge(1, spart, 0);
+
+        builder.addTask(join);
+        builder.addBlockingEdge(rpart, spart);
+        builder.addBlockingEdge(spart, join);
+
+        builder.addTargetEdge(0, join, 0);
+    }
+
+    public int getMemorySize() {
+        return memsize;
+    }
+
+    private class HashPartitionActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+        private String relationName;
+        private int operatorInputIndex;
+        private int keys[];
+
+        public HashPartitionActivityNode(String relationName, int keys[], int operatorInputIndex) {
+            this.relationName = relationName;
+            this.keys = keys;
+            this.operatorInputIndex = operatorInputIndex;
+        }
+
+        @Override
+        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
+                int partition, int nPartitions) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
+                final IOperatorEnvironment env, int partition, final int nPartitions) {
+            final RecordDescriptor rd0 = plan.getJobSpecification().getOperatorInputRecordDescriptor(getOperatorId(),
+                    operatorInputIndex);
+            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+            for (int i = 0; i < comparatorFactories.length; ++i) {
+                comparators[i] = comparatorFactories[i].createBinaryComparator();
+            }
+            IOperatorNodePushable op = new IOperatorNodePushable() {
+                private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx, rd0);
+
+                ITuplePartitionComputer hpc = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories)
+                        .createPartitioner();
+
+                private final FrameTupleAppender appender = new FrameTupleAppender(ctx);
+                private ByteBuffer[] outbufs;
+                private File[] files;
+                private FileChannel[] channels;
+                private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
+
+                @Override
+                public void setFrameWriter(int index, IFrameWriter writer) {
+                    throw new IllegalArgumentException();
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    for (int i = 0; i < numPartitions; i++) {
+                        try {
+                            ByteBuffer head = outbufs[i];
+                            accessor0.reset(head);
+                            if (accessor0.getTupleCount() > 0) {
+                                FileChannel wChannel = channels[i];
+                                if (wChannel == null) {
+                                    wChannel = new RandomAccessFile(files[i], "rw").getChannel();
+                                    channels[i] = wChannel;
+                                }
+                                wChannel.write(head);
+                                numWrite++;
+                            }
+                        } catch (IOException e) {
+                            throw new HyracksDataException("error generating partition " + files[i].getName());
+                        }
+                    }
+
+                    env.set(relationName, channels);
+                    env.set(NUM_PARTITION, numPartitions);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    accessor0.reset(buffer);
+                    int tCount = accessor0.getTupleCount();
+                    for (int i = 0; i < tCount; ++i) {
+
+                        int entry = hpc.partition(accessor0, i, numPartitions);
+                        boolean newBuffer = false;
+                        ByteBuffer outbuf = outbufs[entry];
+                        while (true) {
+                            appender.reset(outbuf, newBuffer);
+                            if (appender.append(accessor0, i)) {
+                                break;
+                            } else {
+                                // buffer is full, ie. we cannot fit the tuple
+                                // into the buffer -- write it to disk
+                                try {
+
+                                    FileChannel wChannel = channels[entry];
+                                    if (wChannel == null) {
+                                        wChannel = new RandomAccessFile(files[entry], "rw").getChannel();
+                                        channels[entry] = wChannel;
+                                    }
+
+                                    wChannel.write(outbuf);
+                                    numWrite++;
+                                    outbuf.clear();
+                                    newBuffer = true;
+                                } catch (IOException e) {
+                                    throw new HyracksDataException("error generating partition "
+                                            + files[entry].getName());
+                                }
+                            }
+                        }
+                    }
+                }
+
+                @Override
+                public void open() throws HyracksDataException {
+                    outbufs = new ByteBuffer[numPartitions];
+                    files = new File[numPartitions];
+                    channels = new FileChannel[numPartitions];
+                    for (int i = 0; i < numPartitions; i++) {
+                        try {
+                            files[i] = ctx.getResourceManager().createFile(relationName, null);
+                            files[i].deleteOnExit();
+                            outbufs[i] = ctx.getResourceManager().allocateFrame();
+                        } catch (IOException e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                }
+
+            };
+            return op;
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return GraceHashJoinOperatorDescriptor.this;
+        }
+
+        @Override
+        public boolean supportsPullInterface() {
+            return false;
+        }
+
+        @Override
+        public boolean supportsPushInterface() {
+            return true;
+        }
+
+    }
+
+    private class JoinActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
+                int partition, int nPartitions) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
+                final IOperatorEnvironment env, int partition, int nPartitions) {
+            final RecordDescriptor rd0 = plan.getJobSpecification()
+                    .getOperatorInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor rd1 = plan.getJobSpecification()
+                    .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+            for (int i = 0; i < comparatorFactories.length; ++i) {
+                comparators[i] = comparatorFactories[i].createBinaryComparator();
+            }
+
+            IOperatorNodePushable op = new IOperatorNodePushable() {
+                private InMemoryHashJoin joiner;
+                private ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0,
+                        hashFunctionFactories).createPartitioner();
+                private ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1,
+                        hashFunctionFactories).createPartitioner();
+
+                private IFrameWriter writer;
+                private FileChannel[] channelsR;
+                private FileChannel[] channelsS;
+                private int numPartitions;
+                private int[] maxBufferRi;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    channelsR = (FileChannel[]) env.get(SMALLRELATION);
+                    channelsS = (FileChannel[]) env.get(LARGERELATION);
+                    numPartitions = (Integer) env.get(NUM_PARTITION);
+
+                    ITuplePartitionComputer hpcRep0 = new RepartitionComputer(numPartitions, hpc0);
+                    ITuplePartitionComputer hpcRep1 = new RepartitionComputer(numPartitions, hpc1);
+
+                    writer.open();// open for probe
+
+                    maxBufferRi = new int[numPartitions];
+
+                    ByteBuffer buffer = ctx.getResourceManager().allocateFrame();// input
+                    // buffer
+                    int tableSize = (int) (numPartitions * recordsPerFrame * factor);
+                    for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
+                        int counter = 0;
+                        int state = 0;
+                        try {
+                            FileChannel inChannelR = channelsR[partitionid];
+                            if (inChannelR != null) {
+                                inChannelR.position(0);
+                                while (state != -1) {
+
+                                    joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx, rd0),
+                                            hpcRep0, new FrameTupleAccessor(ctx, rd1), hpcRep1,
+                                            new FrameTuplePairComparator(keys0, keys1, comparators));
+                                    // build
+
+                                    state = inChannelR.read(buffer);
+                                    while (state != -1) {
+
+                                        ByteBuffer copyBuffer = ctx.getResourceManager().allocateFrame();
+                                        FrameUtils.copy(buffer, copyBuffer);
+                                        joiner.build(copyBuffer);
+                                        numReadI1++;
+                                        counter++;
+                                        if (counter > maxBufferRi[partitionid])
+                                            maxBufferRi[partitionid] = counter;
+
+                                        buffer.clear();
+                                        state = inChannelR.read(buffer);
+                                    }
+
+                                    // probe
+
+                                    buffer.clear();
+
+                                    FileChannel inChannelS = channelsS[partitionid];
+                                    if (inChannelS != null) {
+                                        inChannelS.position(0);
+                                        while (inChannelS.read(buffer) != -1) {
+                                            joiner.join(buffer, writer);
+                                            numReadI2++;
+                                            buffer.clear();
+                                        }
+                                        inChannelS.close();
+                                        joiner.closeJoin(writer);
+                                    }
+                                }
+                                inChannelR.close();
+                            }
+                        } catch (IOException e) {
+                            throw new HyracksDataException(e);
+                        }
+                    }
+                    writer.close();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    throw new IllegalStateException();
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    env.set(LARGERELATION, null);
+                    env.set(SMALLRELATION, null);
+                    env.set(NUM_PARTITION, null);
+                }
+
+                @Override
+                public void setFrameWriter(int index, IFrameWriter writer) {
+                    if (index != 0) {
+                        throw new IllegalStateException();
+                    }
+                    this.writer = writer;
+                }
+            };
+            return op;
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return GraceHashJoinOperatorDescriptor.this;
+        }
+
+        @Override
+        public boolean supportsPullInterface() {
+            return false;
+        }
+
+        @Override
+        public boolean supportsPushInterface() {
+            return true;
+        }
+    }
+}
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
index 38fbc69..51fc7c1 100644
--- a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/InMemoryHashJoinOperatorDescriptor.java
@@ -80,13 +80,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             throw new UnsupportedOperationException();
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition) {
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             final RecordDescriptor rd0 = plan.getJobSpecification()
                     .getOperatorInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = plan.getJobSpecification()
@@ -150,13 +150,13 @@
 
         @Override
         public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
-                int partition) {
+                int partition, int nPartitions) {
             throw new UnsupportedOperationException();
         }
 
         @Override
         public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
-                final IOperatorEnvironment env, int partition) {
+                final IOperatorEnvironment env, int partition, int nPartitions) {
             IOperatorNodePushable op = new IOperatorNodePushable() {
                 private IFrameWriter writer;
                 private InMemoryHashJoin joiner;
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/RepartitionComputer.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/RepartitionComputer.java
new file mode 100644
index 0000000..c5c3af8
--- /dev/null
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/RepartitionComputer.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.coreops.join;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
+
+public class RepartitionComputer implements ITuplePartitionComputer {
+    private int factor;
+    private ITuplePartitionComputer delegate;
+
+    public RepartitionComputer(int factor, ITuplePartitionComputer delegate) {
+        super();
+        this.factor = factor;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public int partition(FrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+        return delegate.partition(accessor, tIndex, factor * nParts) / factor;
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 6770fd8..64a59c4 100644
--- a/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.coreops.data.StringSerializerDeserializer;
 import edu.uci.ics.hyracks.coreops.file.CSVFileScanOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.file.FileSplit;
+import edu.uci.ics.hyracks.coreops.join.GraceHashJoinOperatorDescriptor;
 import edu.uci.ics.hyracks.coreops.join.InMemoryHashJoinOperatorDescriptor;
 
 public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
@@ -51,87 +52,122 @@
     public void customerOrderCIDJoin() throws Exception {
         JobSpecification spec = new JobSpecification();
 
-        FileSplit[] custSplits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl"))
-        };
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
 
-        FileSplit[] ordersSplits = new FileSplit[] {
-            new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl"))
-        };
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-            '|', "'\"");
-        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+                '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-            "'\"");
-        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+                "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
-            1
-        }, new int[] {
-            0
-        }, new IBinaryHashFunctionFactory[] {
-            StringBinaryHashFunctionFactory.INSTANCE
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, custOrderJoinDesc, 128);
-        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
+        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        printer.setPartitionConstraint(printerPartitionConstraint);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDGraceJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
+
+        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+                '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+                "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        custScanner.setPartitionConstraint(custPartitionConstraint);
+
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 4, 10, 200, 1.2,
+                new int[] { 1 }, new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        join.setPartitionConstraint(joinPartitionConstraint);
+
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -152,104 +188,143 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
-            new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
-        };
+                new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+                new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
-        };
+                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-            '|', "'\"");
+                '|', "'\"");
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-            "'\"");
+                "'\"");
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
-            1
-        }, new int[] {
-            0
-        }, new IBinaryHashFunctionFactory[] {
-            StringBinaryHashFunctionFactory.INSTANCE
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
         PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                1
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                0
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDGraceJoinMulti() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+                new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
+
+        CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+                '|', "'\"");
+        PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+        CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+                "'\"");
+        PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        custScanner.setPartitionConstraint(custPartitionConstraint);
+
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
+                new int[] { 1 }, new int[] { 0 },
+                new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+        PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+        join.setPartitionConstraint(joinPartitionConstraint);
+
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+        printer.setPartitionConstraint(printerPartitionConstraint);
+
+        IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -264,101 +339,65 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
-            new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
-        };
+                new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+                new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
-        };
+                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-            '|', "'\"");
+                '|', "'\"");
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-            "'\"");
+                "'\"");
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
-            1
-        }, new int[] {
-            0
-        }, new IBinaryHashFunctionFactory[] {
-            StringBinaryHashFunctionFactory.INSTANCE
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
         join.setPartitionConstraint(new PartitionCountConstraint(2));
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                1
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordJoinConn, ordScanner, 0, join, 0);
 
         IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                0
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custJoinConn, custScanner, 0, join, 1);
 
         IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -373,114 +412,75 @@
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] custSplits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
-            new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
-        };
+                new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+                new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
         RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
 
         FileSplit[] ordersSplits = new FileSplit[] {
-            new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
-            new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
-        };
+                new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+                new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
         RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
 
         RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE,
-            StringSerializerDeserializer.INSTANCE
-        });
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+                StringSerializerDeserializer.INSTANCE });
 
         CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
-            '|', "'\"");
+                '|', "'\"");
         PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         ordScanner.setPartitionConstraint(ordersPartitionConstraint);
 
         CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
-            "'\"");
+                "'\"");
         PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         custScanner.setPartitionConstraint(custPartitionConstraint);
 
         MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc);
         ordMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        }));
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
 
         MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc);
         custMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        }));
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
 
-        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
-            1
-        }, new int[] {
-            0
-        }, new IBinaryHashFunctionFactory[] {
-            StringBinaryHashFunctionFactory.INSTANCE
-        }, new IBinaryComparatorFactory[] {
-            StringBinaryComparatorFactory.INSTANCE
-        }, custOrderJoinDesc, 128);
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
         PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
-        });
+                new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
         join.setPartitionConstraint(joinPartitionConstraint);
 
         PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
-        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
-            new AbsoluteLocationConstraint(NC1_ID)
-        });
+        PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+                new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
         printer.setPartitionConstraint(printerPartitionConstraint);
 
         IConnectorDescriptor ordPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                1
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
 
         IConnectorDescriptor custPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
-            new FieldHashPartitionComputerFactory(new int[] {
-                0
-            }, new IBinaryHashFunctionFactory[] {
-                StringBinaryHashFunctionFactory.INSTANCE
-            }));
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
         spec.connect(custPartConn, custScanner, 0, custMat, 0);
 
         IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);