Merged r432 from trunk

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@433 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index a431939..0987bd2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -16,6 +16,7 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
 
@@ -25,4 +26,6 @@
     public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory);
 
     public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+    
+    public ICCContext getCCContext();
 }
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
new file mode 100644
index 0000000..266ebe9
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface ICCContext {
+    public Map<String, Set<String>> getIPAddressNodeMap();
+}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 1664531..ec8ab76 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -20,6 +20,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
 import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
@@ -91,7 +92,8 @@
      * @param plan
      *            - Job Plan
      */
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan);
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+            ICCApplicationContext appCtx);
 
     /**
      * Indicate which consumer partitions may receive data from the given producer partition.
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index 3731459..78847eb 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobActivityGraph;
@@ -73,7 +74,8 @@
      * @param plan
      *            - Job Plan
      */
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan);
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+            ICCApplicationContext appCtx);
 
     /**
      * Translates this operator descriptor to JSON.
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
new file mode 100644
index 0000000..7552c17
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface INullWriter {
+    public void writeNull(DataOutput out) throws HyracksDataException;
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
new file mode 100644
index 0000000..6d9a744
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INullWriterFactory extends Serializable {
+    public INullWriter createNullWriter();
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
new file mode 100644
index 0000000..3251944
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePairComparator {
+
+    public int compare(IFrameTupleAccessor outerRef, int outerIndex, IFrameTupleAccessor innerRef, int innerIndex)
+            throws HyracksDataException;
+
+}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
new file mode 100644
index 0000000..26cb525
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITuplePairComparatorFactory extends Serializable {
+
+    public ITuplePairComparator createTuplePairComparator();
+}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 4210a23..d0561ea 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -24,6 +24,7 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -35,6 +36,7 @@
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -67,6 +69,7 @@
 import edu.uci.ics.hyracks.control.common.base.CCConfig;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
 import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.base.NCConfig;
 import edu.uci.ics.hyracks.control.common.base.NodeParameters;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
@@ -82,6 +85,8 @@
 
     private final Map<String, NodeControllerState> nodeRegistry;
 
+    private final Map<String, Set<String>> ipAddressNodeNameMap;
+
     private final Map<String, CCApplicationContext> applications;
 
     private final ServerContext serverCtx;
@@ -102,9 +107,12 @@
 
     private final CCClientInterface ccci;
 
+    private final ICCContext ccContext;
+
     public ClusterControllerService(CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+        ipAddressNodeNameMap = new HashMap<String, Set<String>>();
         applications = new Hashtable<String, CCApplicationContext>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
                 ClusterControllerService.class.getName()));
@@ -115,6 +123,12 @@
         scheduler = new DefaultJobScheduler(this);
         this.timer = new Timer(true);
         ccci = new CCClientInterface(this);
+        ccContext = new ICCContext() {
+            @Override
+            public Map<String, Set<String>> getIPAddressNodeMap() {
+                return ipAddressNodeNameMap;
+            }
+        };
     }
 
     @Override
@@ -162,6 +176,10 @@
         return nodeRegistry;
     }
 
+    public Map<String, Set<String>> getIPAddressNodeNameMap() {
+        return ipAddressNodeNameMap;
+    }
+
     public CCConfig getConfig() {
         return ccConfig;
     }
@@ -178,7 +196,8 @@
     @Override
     public NodeParameters registerNode(INodeController nodeController) throws Exception {
         String id = nodeController.getId();
-        NodeControllerState state = new NodeControllerState(nodeController);
+        NCConfig ncConfig = nodeController.getConfiguration();
+        NodeControllerState state = new NodeControllerState(nodeController, ncConfig);
         jobQueue.scheduleAndSync(new RegisterNodeEvent(this, id, state));
         nodeController.notifyRegistration(this);
         LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
@@ -249,7 +268,7 @@
             if (applications.containsKey(appName)) {
                 throw new HyracksException("Duplicate application with name: " + appName + " being created.");
             }
-            CCApplicationContext appCtx = new CCApplicationContext(serverCtx, appName);
+            CCApplicationContext appCtx = new CCApplicationContext(serverCtx, ccContext, appName);
             applications.put(appName, appCtx);
         }
     }
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index e614bc5..933504e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -1,14 +1,39 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.hyracks.control.cc;
 
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
 import edu.uci.ics.hyracks.control.common.base.INodeController;
+import edu.uci.ics.hyracks.control.common.base.NCConfig;
 
 public class NodeControllerState {
     private final INodeController nodeController;
 
+    private final NCConfig ncConfig;
+
+    private final Set<UUID> activeJobIds;
+
     private int lastHeartbeatDuration;
 
-    public NodeControllerState(INodeController nodeController) {
+    public NodeControllerState(INodeController nodeController, NCConfig ncConfig) {
         this.nodeController = nodeController;
+        this.ncConfig = ncConfig;
+        activeJobIds = new HashSet<UUID>();
     }
 
     public void notifyHeartbeat() {
@@ -26,4 +51,12 @@
     public INodeController getNodeController() {
         return nodeController;
     }
+
+    public NCConfig getNCConfig() {
+        return ncConfig;
+    }
+
+    public Set<UUID> getActiveJobIds() {
+        return activeJobIds;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 5ca0269..e375aad 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -1,3 +1,17 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package edu.uci.ics.hyracks.control.cc.application;
 
 import java.io.IOException;
@@ -8,6 +22,7 @@
 
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
@@ -17,12 +32,15 @@
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
 public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
+    private final ICCContext ccContext;
+
     private IJobSpecificationFactory jobSpecFactory;
 
     private List<IJobLifecycleListener> jobLifecycleListeners;
 
-    public CCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+    public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext, String appName) throws IOException {
         super(serverCtx, appName);
+        this.ccContext = ccContext;
         jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
         jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
     }
@@ -33,6 +51,10 @@
         bootstrap.start();
     }
 
+    public ICCContext getCCContext() {
+        return ccContext;
+    }
+
     @Override
     public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory) {
         this.jobSpecFactory = jobSpecFactory;
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
index 6316b3e..5f9377b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
@@ -14,7 +14,9 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -38,5 +40,13 @@
             throw new Exception("Node with this name already registered.");
         }
         nodeMap.put(nodeId, state);
+        Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
+        String ipAddress = state.getNCConfig().dataIPAddress;
+        Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+        if (nodes == null) {
+            nodes = new HashSet<String>();
+            ipAddressNodeNameMap.put(ipAddress, nodes);
+        }
+        nodes.add(nodeId);
     }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index b64a784..6e000cc 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -44,9 +44,17 @@
                 LOGGER.info(e.getKey() + " considered dead");
             }
         }
+        Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
         for (String deadNode : deadNodes) {
             NodeControllerState state = nodeMap.remove(deadNode);
             // Deal with dead tasks.
+            String ipAddress = state.getNCConfig().dataIPAddress;
+            Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
+            if (ipNodes != null) {
+                if (ipNodes.remove(deadNode) && ipNodes.isEmpty()) {
+                    ipAddressNodeNameMap.remove(ipAddress);
+                }
+            }
         }
     }
 
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index a15469a..12215ff 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -25,6 +25,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.Constraint;
 import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
@@ -230,6 +231,7 @@
         try {
             solver = new PartitionConstraintSolver();
             final JobActivityGraph jag = jobRun.getJobActivityGraph();
+            final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
             JobSpecification spec = jag.getJobSpecification();
             final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
             final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
@@ -241,13 +243,13 @@
             PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
                 @Override
                 public void visit(IOperatorDescriptor op) {
-                    op.contributeSchedulingConstraints(acceptor, jag);
+                    op.contributeSchedulingConstraints(acceptor, jag, appCtx);
                 }
             });
             PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
                 @Override
                 public void visit(IConnectorDescriptor conn) {
-                    conn.contributeSchedulingConstraints(acceptor, jag);
+                    conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
                 }
             });
             contributedConstraints.addAll(spec.getUserConstraints());
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index c4c396f..eac578e 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -24,7 +24,6 @@
 import java.rmi.registry.Registry;
 import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -133,7 +132,7 @@
         partitionManager = new PartitionManager(this, connectionManager.getNetworkAddress());
         connectionManager.setPartitionRequestListener(partitionManager);
 
-        jobletMap = new HashMap<UUID, Joblet>();
+        jobletMap = new Hashtable<UUID, Joblet>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
                 NodeControllerService.class.getName()), id));
@@ -256,26 +255,25 @@
                     LOGGER.info("Initializing " + taId + " -> " + han);
                 }
                 final int partition = tid.getPartition();
-                Task task = new Task(joblet, taId, han.getClass().getName());
+                Task task = new Task(joblet, taId, han.getClass().getName(), executor);
                 IOperatorNodePushable operator = han.createPushRuntime(task,
                         joblet.getEnvironment(han.getActivityId().getOperatorDescriptorId(), partition), rdp,
                         partition, td.getPartitionCount());
 
-                IPartitionCollector collector = null;
+                List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
 
                 List<IConnectorDescriptor> inputs = plan.getActivityInputConnectorDescriptors(tid.getActivityId());
                 if (inputs != null) {
                     for (int i = 0; i < inputs.size(); ++i) {
-                        if (i >= 1) {
-                            throw new HyracksException("Multiple inputs to an activity not currently supported");
-                        }
                         IConnectorDescriptor conn = inputs.get(i);
                         IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("input: " + i + ": " + conn.getConnectorId());
                         }
                         RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
-                        collector = createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy);
+                        IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
+                                recordDesc, cPolicy);
+                        collectors.add(collector);
                     }
                 }
                 List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
@@ -297,7 +295,7 @@
                     }
                 }
 
-                task.setTaskRuntime(collector, operator);
+                task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator);
                 joblet.addTask(task);
 
                 task.start();
@@ -355,7 +353,7 @@
     }
 
     @Override
-    public synchronized void cleanUpJob(UUID jobId) throws Exception {
+    public void cleanUpJob(UUID jobId) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index ac517cf..d5ce485 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -17,6 +17,8 @@
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -46,29 +48,32 @@
 
     private final String displayName;
 
+    private final Executor executor;
+
     private final IWorkspaceFileFactory fileFactory;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
     private final Map<String, Counter> counterMap;
 
-    private IPartitionCollector collector;
+    private IPartitionCollector[] collectors;
 
     private IOperatorNodePushable operator;
 
     private volatile boolean aborted;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName) {
+    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor) {
         this.joblet = joblet;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
+        this.executor = executor;
         fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         counterMap = new HashMap<String, Counter>();
     }
 
-    public void setTaskRuntime(IPartitionCollector collector, IOperatorNodePushable operator) {
-        this.collector = collector;
+    public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) {
+        this.collectors = collectors;
         this.operator = operator;
     }
 
@@ -145,8 +150,8 @@
 
     public void abort() {
         aborted = true;
-        if (collector != null) {
-            collector.abort();
+        for (IPartitionCollector c : collectors) {
+            c.abort();
         }
     }
 
@@ -158,36 +163,27 @@
             ct.setName(displayName + ": " + taskAttemptId);
             operator.initialize();
             try {
-                if (collector != null) {
-                    if (aborted) {
-                        return;
-                    }
-                    collector.open();
-                    try {
-                        joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector);
-                        IFrameReader reader = collector.getReader();
-                        reader.open();
-                        try {
-                            IFrameWriter writer = operator.getInputFrameWriter(0);
-                            writer.open();
-                            try {
-                                ByteBuffer buffer = allocateFrame();
-                                while (reader.nextFrame(buffer)) {
-                                    if (aborted) {
-                                        return;
-                                    }
-                                    buffer.flip();
-                                    writer.nextFrame(buffer);
-                                    buffer.compact();
+                if (collectors.length > 0) {
+                    final Semaphore sem = new Semaphore(collectors.length - 1);
+                    for (int i = 1; i < collectors.length; ++i) {
+                        final IPartitionCollector collector = collectors[i];
+                        final IFrameWriter writer = operator.getInputFrameWriter(i);
+                        sem.acquire();
+                        executor.execute(new Runnable() {
+                            public void run() {
+                                try {
+                                    pushFrames(collector, writer);
+                                } catch (HyracksDataException e) {
+                                } finally {
+                                    sem.release();
                                 }
-                            } finally {
-                                writer.close();
                             }
-                        } finally {
-                            reader.close();
-                        }
+                        });
+                    }
+                    try {
+                        pushFrames(collectors[0], operator.getInputFrameWriter(0));
                     } finally {
-                        collector.close();
+                        sem.acquire(collectors.length - 1);
                     }
                 }
             } finally {
@@ -202,4 +198,42 @@
             close();
         }
     }
+
+    private void pushFrames(IPartitionCollector collector, IFrameWriter writer) throws HyracksDataException {
+        if (aborted) {
+            return;
+        }
+        try {
+            collector.open();
+            try {
+                joblet.advertisePartitionRequest(collector.getRequiredPartitionIds(), collector);
+                IFrameReader reader = collector.getReader();
+                reader.open();
+                try {
+                    writer.open();
+                    try {
+                        ByteBuffer buffer = allocateFrame();
+                        while (reader.nextFrame(buffer)) {
+                            if (aborted) {
+                                return;
+                            }
+                            buffer.flip();
+                            writer.nextFrame(buffer);
+                            buffer.compact();
+                        }
+                    } finally {
+                        writer.close();
+                    }
+                } finally {
+                    reader.close();
+                }
+            } finally {
+                collector.close();
+            }
+        } catch (HyracksException e) {
+            throw new HyracksDataException(e);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 4ec1b6f..7647e50 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -118,30 +118,32 @@
         return false;
     }
     
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots, byte[] bytes, int offset,
-            int length) {
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
+            int dataLen1) {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
 
-        if (tupleDataEndOffset + length0 + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+        int slotsLen1 = fieldSlots1.length * 4;
+        int length1 = slotsLen1 + dataLen1;
+        
+        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
             ByteBuffer src0 = accessor0.getBuffer();
             int slotsLen0 = accessor0.getFieldSlotsLength();
             int dataLen0 = length0 - slotsLen0;
             // Copy slots from accessor0 verbatim
             System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
-            // Copy slots from fieldSlots with the following transformation:
-            // newSlotIdx = oldSlotIdx + dataLen0
-            for (int i = 0; i < fieldSlots.length; ++i) {
-                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots[i] + dataLen0));
+            // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+            for (int i = 0; i < fieldSlots1.length; ++i) {
+                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
             System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
-                    + fieldSlots.length * 4, dataLen0);
-            // Copy bytes
-            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots.length * 4
-                    + dataLen0, length);
-            tupleDataEndOffset += (length0 + fieldSlots.length * 4 + length);
+                    + slotsLen1, dataLen0);
+            // Copy bytes1
+            System.arraycopy(bytes1, offset1, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4
+                    + dataLen0, dataLen1);
+            tupleDataEndOffset += (length0 + length1);
             buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
             buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
@@ -150,6 +152,41 @@
         return false;
     }
 
+    public boolean appendConcat(int[] fieldSlots0, byte[] bytes0, int offset0, int dataLen0, IFrameTupleAccessor accessor1,
+            int tIndex1) {
+        int slotsLen0 = fieldSlots0.length * 4;
+        int length0 = slotsLen0 + dataLen0;
+        
+        int startOffset1 = accessor1.getTupleStartOffset(tIndex1);
+        int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
+        int length1 = endOffset1 - startOffset1;
+        
+        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+            ByteBuffer src1 = accessor1.getBuffer();
+            int slotsLen1 = accessor1.getFieldSlotsLength();
+            int dataLen1 = length1 - slotsLen1;
+            // Copy fieldSlots0 verbatim
+            for (int i = 0; i < fieldSlots0.length; ++i) {
+                buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots0[i]);
+            }
+            // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+            for (int i = 0; i < slotsLen1 / 4; ++i) {
+                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, src1.getInt(startOffset1 + i * 4) + dataLen0);
+            }
+            // Copy bytes0
+            System.arraycopy(bytes0, offset0, buffer.array(), tupleDataEndOffset + slotsLen0 + slotsLen1, 
+                    dataLen0);
+            // Copy data1
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0 
+                    + slotsLen1 + dataLen0, dataLen1);
+            tupleDataEndOffset += (length0 + length1);
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            return true;
+        }
+        return false;
+    }
     public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) {
         int fTargetSlotsLength = fields.length * 4;
         int length = fTargetSlotsLength;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 19965b9..76cb1f0 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -50,7 +51,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan) {
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+            ICCApplicationContext appCtx) {
         // do nothing
     }
 }
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 8b16068..97ca255 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -68,7 +69,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan) {
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+            ICCApplicationContext appCtx) {
         // do nothing
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 556466f..67130c9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -16,6 +16,7 @@
 
 import java.util.BitSet;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
 import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
@@ -55,7 +56,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan) {
+    public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
+            ICCApplicationContext appCtx) {
         JobSpecification jobSpec = plan.getJobSpecification();
         IOperatorDescriptor consumer = jobSpec.getConsumer(this);
         IOperatorDescriptor producer = jobSpec.getProducer(this);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 27ae2db..c60c0af 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -44,8 +46,8 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String SMALLRELATION = "RelR";
-    private static final String LARGERELATION = "RelS";
+    private static final String RELATION0 = "Rel0";
+    private static final String RELATION1 = "Rel1";
 
     private static final long serialVersionUID = 1L;
     private final int[] keys0;
@@ -56,6 +58,8 @@
     private final double factor;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
 
     public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
@@ -69,15 +73,33 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.recordsPerFrame = recordsPerFrame;
+        this.factor = factor;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
     }
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, 0), SMALLRELATION, keys0,
-                0);
-        HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, 1), LARGERELATION, keys1,
-                1);
+        HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, 0), RELATION0, keys0, 0);
+        HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, 1), RELATION1, keys1, 1);
         JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, 2));
 
         builder.addActivity(rpart);
@@ -217,18 +239,24 @@
             }
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
-                private RunFileWriter[] rWriters;
-                private RunFileWriter[] sWriters;
+                private RunFileWriter[] buildWriters;
+                private RunFileWriter[] probeWriters;
                 private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
 
                 @Override
                 public void initialize() throws HyracksDataException {
-                    rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
-                    sWriters = (RunFileWriter[]) env.get(LARGERELATION);
+                    buildWriters = (RunFileWriter[]) env.get(RELATION1);
+                    probeWriters = (RunFileWriter[]) env.get(RELATION0);
 
                     ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
                             new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -241,34 +269,36 @@
                     // buffer
                     int tableSize = (int) (numPartitions * recordsPerFrame * factor);
                     for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
-                        RunFileWriter rWriter = rWriters[partitionid];
-                        RunFileWriter sWriter = sWriters[partitionid];
-                        if (rWriter == null || sWriter == null) {
+                        RunFileWriter buildWriter = buildWriters[partitionid];
+                        RunFileWriter probeWriter = probeWriters[partitionid];
+                        if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                             continue;
                         }
                         joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                                 hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                                new FrameTuplePairComparator(keys0, keys1, comparators));
+                                new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
 
                         // build
-                        RunFileReader rReader = rWriter.createReader();
-                        rReader.open();
-                        while (rReader.nextFrame(buffer)) {
-                            ByteBuffer copyBuffer = ctx.allocateFrame();
-                            FrameUtils.copy(buffer, copyBuffer);
-                            joiner.build(copyBuffer);
-                            buffer.clear();
+                        if (buildWriter != null) {
+                            RunFileReader buildReader = buildWriter.createReader();
+                            buildReader.open();
+                            while (buildReader.nextFrame(buffer)) {
+                                ByteBuffer copyBuffer = ctx.allocateFrame();
+                                FrameUtils.copy(buffer, copyBuffer);
+                                joiner.build(copyBuffer);
+                                buffer.clear();
+                            }
+                            buildReader.close();
                         }
-                        rReader.close();
 
                         // probe
-                        RunFileReader sReader = sWriter.createReader();
-                        sReader.open();
-                        while (sReader.nextFrame(buffer)) {
+                        RunFileReader probeReader = probeWriter.createReader();
+                        probeReader.open();
+                        while (probeReader.nextFrame(buffer)) {
                             joiner.join(buffer, writer);
                             buffer.clear();
                         }
-                        sReader.close();
+                        probeReader.close();
                         joiner.closeJoin(writer);
                     }
                     writer.close();
@@ -276,8 +306,8 @@
 
                 @Override
                 public void deinitialize() throws HyracksDataException {
-                    env.set(LARGERELATION, null);
-                    env.set(SMALLRELATION, null);
+                    env.set(RELATION1, null);
+                    env.set(RELATION0, null);
                 }
             };
             return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index df7440b..7c9134f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,19 +48,21 @@
 
 public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String JOINER0 = "joiner0";
-    private static final String SMALLRELATION = "RelR";
-    private static final String LARGERELATION = "RelS";
+    private static final String BUILDRELATION = "BuildRel";
+    private static final String PROBERELATION = "ProbeRel";
     private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
     private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
     private final int memsize;
     private static final long serialVersionUID = 1L;
     private final int inputsize0;
     private final double factor;
+    private final int recordsPerFrame;
     private final int[] keys0;
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
-    private final int recordsPerFrame;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
 
     /**
      * @param spec
@@ -88,19 +92,39 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    public HybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.factor = factor;
+        this.recordsPerFrame = recordsPerFrame;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
     }
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId, 0), SMALLRELATION);
-        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId, 1), LARGERELATION);
+        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId, 0), BUILDRELATION);
+        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId, 1), PROBERELATION);
 
         builder.addActivity(phase1);
-        builder.addSourceEdge(0, phase1, 0);
+        builder.addSourceEdge(1, phase1, 0);
 
         builder.addActivity(phase2);
-        builder.addSourceEdge(1, phase2, 0);
+        builder.addSourceEdge(0, phase2, 0);
 
         builder.addBlockingEdge(phase1, phase2);
 
@@ -125,12 +149,18 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
-                private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
-                ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
-                        .createPartitioner();
+                private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+                private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
+                        hashFunctionFactories).createPartitioner();
                 private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
                 private ByteBuffer[] bufferForPartitions;
@@ -146,8 +176,8 @@
 
                     for (int i = 0; i < B; i++) {
                         ByteBuffer buf = bufferForPartitions[i];
-                        accessor0.reset(buf);
-                        if (accessor0.getTupleCount() > 0) {
+                        accessorBuild.reset(buf);
+                        if (accessorBuild.getTupleCount() > 0) {
                             write(i, buf);
                         }
                         closeWriter(i);
@@ -163,17 +193,17 @@
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
 
                     if (memoryForHashtable != memsize - 2) {
-                        accessor0.reset(buffer);
-                        int tCount = accessor0.getTupleCount();
+                        accessorBuild.reset(buffer);
+                        int tCount = accessorBuild.getTupleCount();
                         for (int i = 0; i < tCount; ++i) {
                             int entry = -1;
                             if (memoryForHashtable == 0) {
-                                entry = hpc0.partition(accessor0, i, B);
+                                entry = hpcBuild.partition(accessorBuild, i, B);
                                 boolean newBuffer = false;
                                 ByteBuffer bufBi = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(bufBi, newBuffer);
-                                    if (appender.append(accessor0, i)) {
+                                    if (appender.append(accessorBuild, i)) {
                                         break;
                                     } else {
                                         write(entry, bufBi);
@@ -182,15 +212,16 @@
                                     }
                                 }
                             } else {
-                                entry = hpc0.partition(accessor0, i, (int) (inputsize0 * factor / nPartitions));
+                                entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
                                 if (entry < memoryForHashtable) {
                                     while (true) {
-                                        if (!ftappender.append(accessor0, i)) {
+                                        if (!ftappender.append(accessorBuild, i)) {
                                             build(inBuffer);
 
                                             ftappender.reset(inBuffer, true);
-                                        } else
+                                        } else {
                                             break;
+                                        }
                                     }
                                 } else {
                                     entry %= B;
@@ -198,7 +229,7 @@
                                     ByteBuffer bufBi = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(bufBi, newBuffer);
-                                        if (appender.append(accessor0, i)) {
+                                        if (appender.append(accessorBuild, i)) {
                                             break;
                                         } else {
                                             write(entry, bufBi);
@@ -253,7 +284,7 @@
                     int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
                     joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                             hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators));
+                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
                     bufferForPartitions = new ByteBuffer[B];
                     fWriters = new RunFileWriter[B];
                     for (int i = 0; i < B; i++) {
@@ -291,11 +322,11 @@
 
     private class PartitionAndJoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
-        private String largeRelation;
+        private String relationName;
 
         public PartitionAndJoinActivityNode(ActivityId id, String relationName) {
             super(id);
-            this.largeRelation = relationName;
+            this.relationName = relationName;
         }
 
         @Override
@@ -307,22 +338,28 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
-                private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
-                private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+                private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
                         hashFunctionFactories);
-                private ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+                private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
                         hashFunctionFactories);
-                ITuplePartitionComputer hpc1 = hpcf1.createPartitioner();
+                private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
 
                 private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
                 private final ByteBuffer inBuffer = ctx.allocateFrame();
                 private final ByteBuffer outBuffer = ctx.allocateFrame();
-                private RunFileWriter[] rWriters;
-                private RunFileWriter[] sWriters;
+                private RunFileWriter[] buildWriters;
+                private RunFileWriter[] probeWriters;
                 private ByteBuffer[] bufferForPartitions;
                 private int B;
                 private int memoryForHashtable;
@@ -331,10 +368,10 @@
                 public void open() throws HyracksDataException {
                     joiner0 = (InMemoryHashJoin) env.get(JOINER0);
                     writer.open();
-                    rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
+                    buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
                     B = (Integer) env.get(NUM_PARTITION);
                     memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
-                    sWriters = new RunFileWriter[B];
+                    probeWriters = new RunFileWriter[B];
                     bufferForPartitions = new ByteBuffer[B];
                     for (int i = 0; i < B; i++) {
                         bufferForPartitions[i] = ctx.allocateFrame();
@@ -346,18 +383,18 @@
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                     if (memoryForHashtable != memsize - 2) {
-                        accessor1.reset(buffer);
-                        int tupleCount1 = accessor1.getTupleCount();
-                        for (int i = 0; i < tupleCount1; ++i) {
+                        accessorProbe.reset(buffer);
+                        int tupleCount0 = accessorProbe.getTupleCount();
+                        for (int i = 0; i < tupleCount0; ++i) {
 
                             int entry = -1;
                             if (memoryForHashtable == 0) {
-                                entry = hpc1.partition(accessor1, i, B);
+                                entry = hpcProbe.partition(accessorProbe, i, B);
                                 boolean newBuffer = false;
                                 ByteBuffer outbuf = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(outbuf, newBuffer);
-                                    if (appender.append(accessor1, i)) {
+                                    if (appender.append(accessorProbe, i)) {
                                         break;
                                     } else {
                                         write(entry, outbuf);
@@ -366,10 +403,10 @@
                                     }
                                 }
                             } else {
-                                entry = hpc1.partition(accessor1, i, (int) (inputsize0 * factor / nPartitions));
+                                entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
                                 if (entry < memoryForHashtable) {
                                     while (true) {
-                                        if (!ftap.append(accessor1, i)) {
+                                        if (!ftap.append(accessorProbe, i)) {
                                             joiner0.join(inBuffer, writer);
                                             ftap.reset(inBuffer, true);
                                         } else
@@ -382,7 +419,7 @@
                                     ByteBuffer outbuf = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(outbuf, newBuffer);
-                                        if (appender.append(accessor1, i)) {
+                                        if (appender.append(accessorProbe, i)) {
                                             break;
                                         } else {
                                             write(entry, outbuf);
@@ -407,8 +444,8 @@
                     if (memoryForHashtable != memsize - 2) {
                         for (int i = 0; i < B; i++) {
                             ByteBuffer buf = bufferForPartitions[i];
-                            accessor1.reset(buf);
-                            if (accessor1.getTupleCount() > 0) {
+                            accessorProbe.reset(buf);
+                            if (accessorProbe.getTupleCount() > 0) {
                                 write(i, buf);
                             }
                             closeWriter(i);
@@ -422,40 +459,43 @@
                             tableSize = (int) (memsize * recordsPerFrame * factor);
                         }
                         for (int partitionid = 0; partitionid < B; partitionid++) {
-                            RunFileWriter rWriter = rWriters[partitionid];
-                            RunFileWriter sWriter = sWriters[partitionid];
-                            if (rWriter == null || sWriter == null) {
+                            RunFileWriter buildWriter = buildWriters[partitionid];
+                            RunFileWriter probeWriter = probeWriters[partitionid];
+                            if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                                 continue;
                             }
 
                             InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
-                                    hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators));
+                                    hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+                                    nullWriters1);
 
-                            RunFileReader rReader = rWriter.createReader();
-                            rReader.open();
-                            while (rReader.nextFrame(inBuffer)) {
-                                ByteBuffer copyBuffer = ctx.allocateFrame();
-                                FrameUtils.copy(inBuffer, copyBuffer);
-                                joiner.build(copyBuffer);
-                                inBuffer.clear();
+                            if (buildWriter != null) {
+                                RunFileReader buildReader = buildWriter.createReader();
+                                buildReader.open();
+                                while (buildReader.nextFrame(inBuffer)) {
+                                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                                    FrameUtils.copy(inBuffer, copyBuffer);
+                                    joiner.build(copyBuffer);
+                                    inBuffer.clear();
+                                }
+                                buildReader.close();
                             }
-                            rReader.close();
 
                             // probe
-                            RunFileReader sReader = sWriter.createReader();
-                            sReader.open();
-                            while (sReader.nextFrame(inBuffer)) {
+                            RunFileReader probeReader = probeWriter.createReader();
+                            probeReader.open();
+                            while (probeReader.nextFrame(inBuffer)) {
                                 joiner.join(inBuffer, writer);
                                 inBuffer.clear();
                             }
-                            sReader.close();
+                            probeReader.close();
                             joiner.closeJoin(writer);
                         }
                     }
                     writer.close();
-                    env.set(LARGERELATION, null);
-                    env.set(SMALLRELATION, null);
+                    env.set(PROBERELATION, null);
+                    env.set(BUILDRELATION, null);
                     env.set(JOINER0, null);
                     env.set(MEM_HASHTABLE, null);
                     env.set(NUM_PARTITION, null);
@@ -467,19 +507,19 @@
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = sWriters[i];
+                    RunFileWriter writer = probeWriters[i];
                     if (writer != null) {
                         writer.close();
                     }
                 }
 
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = sWriters[i];
+                    RunFileWriter writer = probeWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.createManagedWorkspaceFile(largeRelation);
+                        FileReference file = ctx.createManagedWorkspaceFile(relationName);
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
-                        sWriters[i] = writer;
+                        probeWriters[i] = writer;
                     }
                     writer.nextFrame(head);
                 }
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index a49c21a9..ebeceeb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.join;
 
+import java.io.DataOutput;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -21,8 +22,10 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -30,36 +33,51 @@
 public class InMemoryHashJoin {
     private final Link[] table;
     private final List<ByteBuffer> buffers;
-    private final FrameTupleAccessor accessor0;
-    private final ITuplePartitionComputer tpc0;
-    private final FrameTupleAccessor accessor1;
-    private final ITuplePartitionComputer tpc1;
+    private final FrameTupleAccessor accessorBuild;
+    private final ITuplePartitionComputer tpcBuild;
+    private final FrameTupleAccessor accessorProbe;
+    private final ITuplePartitionComputer tpcProbe;
     private final FrameTupleAppender appender;
     private final FrameTuplePairComparator tpComparator;
     private final ByteBuffer outBuffer;
-
+    private final boolean isLeftOuter;
+    private final ArrayTupleBuilder nullTupleBuild;
+    
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
-            FrameTuplePairComparator comparator) {
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+            throws HyracksDataException {
         table = new Link[tableSize];
         buffers = new ArrayList<ByteBuffer>();
-        this.accessor0 = accessor0;
-        this.tpc0 = tpc0;
-        this.accessor1 = accessor1;
-        this.tpc1 = tpc1;
+        this.accessorBuild = accessor1;
+        this.tpcBuild = tpc1;
+        this.accessorProbe = accessor0;
+        this.tpcProbe = tpc0;
         appender = new FrameTupleAppender(ctx.getFrameSize());
         tpComparator = comparator;
         outBuffer = ctx.allocateFrame();
         appender.reset(outBuffer, true);
+        this.isLeftOuter = isLeftOuter;        
+        if (isLeftOuter) {
+            int fieldCountOuter = accessor1.getFieldCount();
+            nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
+            DataOutput out = nullTupleBuild.getDataOutput();
+            for (int i = 0; i < fieldCountOuter; i++) {
+                nullWriters1[i].writeNull(out);
+                nullTupleBuild.addFieldEndOffset();
+            }
+        } else {
+            nullTupleBuild = null;
+        }
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
         buffers.add(buffer);
         int bIndex = buffers.size() - 1;
-        accessor0.reset(buffer);
-        int tCount = accessor0.getTupleCount();
+        accessorBuild.reset(buffer);
+        int tCount = accessorBuild.getTupleCount();
         for (int i = 0; i < tCount; ++i) {
-            int entry = tpc0.partition(accessor0, i, table.length);
+            int entry = tpcBuild.partition(accessorBuild, i, table.length);
             long tPointer = (((long) bIndex) << 32) + i;
             Link link = table[entry];
             if (link == null) {
@@ -70,29 +88,41 @@
     }
 
     public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        accessor1.reset(buffer);
-        int tupleCount1 = accessor1.getTupleCount();
-        for (int i = 0; i < tupleCount1; ++i) {
-            int entry = tpc1.partition(accessor1, i, table.length);
+        accessorProbe.reset(buffer);
+        int tupleCount0 = accessorProbe.getTupleCount();
+        for (int i = 0; i < tupleCount0; ++i) {
+            int entry = tpcProbe.partition(accessorProbe, i, table.length);
             Link link = table[entry];
+            boolean matchFound = false;
             if (link != null) {
                 for (int j = 0; j < link.size; ++j) {
                     long pointer = link.pointers[j];
                     int bIndex = (int) ((pointer >> 32) & 0xffffffff);
                     int tIndex = (int) (pointer & 0xffffffff);
-                    accessor0.reset(buffers.get(bIndex));
-                    int c = tpComparator.compare(accessor0, tIndex, accessor1, i);
+                    accessorBuild.reset(buffers.get(bIndex));
+                    int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
                     if (c == 0) {
-                        if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+                        matchFound = true;
+                        if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
                             flushFrame(outBuffer, writer);
                             appender.reset(outBuffer, true);
-                            if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+                            if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
                                 throw new IllegalStateException();
                             }
                         }
                     }
                 }
             }
+            if (!matchFound && isLeftOuter) {
+                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+                    flushFrame(outBuffer, writer);
+                    appender.reset(outBuffer, true);
+                    if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
+                            .getSize())) {
+                        throw new IllegalStateException();
+                    }                  
+                }
+            }
         }
     }
 
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index b4797e8..ce8be1c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -46,6 +48,8 @@
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
     private final int tableSize;
 
     public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
@@ -56,8 +60,25 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
-        this.tableSize = tableSize;
         recordDescriptors[0] = recordDescriptor;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        this.tableSize = tableSize;
+    }
+
+    public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+            int tableSize) {
+        super(spec, 2, 1);
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        recordDescriptors[0] = recordDescriptor;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
+        this.tableSize = tableSize;
     }
 
     @Override
@@ -66,10 +87,11 @@
         HashProbeActivityNode hpa = new HashProbeActivityNode(new ActivityId(odId, 1));
 
         builder.addActivity(hba);
-        builder.addSourceEdge(0, hba, 0);
+        builder.addSourceEdge(1, hba, 0);
 
         builder.addActivity(hpa);
-        builder.addSourceEdge(1, hpa, 0);
+        builder.addSourceEdge(0, hpa, 0);
+
         builder.addTargetEdge(0, hpa, 0);
 
         builder.addBlockingEdge(hba, hpa);
@@ -91,6 +113,13 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
+
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
@@ -102,7 +131,7 @@
                             .createPartitioner();
                     joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                             hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators));
+                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
                 }
 
                 @Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
new file mode 100644
index 0000000..e92d787
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class NestedLoopJoin {
+    private final FrameTupleAccessor accessorInner;
+    private final FrameTupleAccessor accessorOuter;
+    private final FrameTupleAppender appender;
+    private final ITuplePairComparator tpComparator;
+    private final ByteBuffer outBuffer;
+    private final ByteBuffer innerBuffer;
+    private final List<ByteBuffer> outBuffers;
+    private final int memSize;
+    private final IHyracksTaskContext ctx;
+    private RunFileReader runFileReader;
+    private int currentMemSize = 0;
+    private final RunFileWriter runFileWriter;
+
+    public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
+            ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+        this.accessorInner = accessor1;
+        this.accessorOuter = accessor0;
+        this.appender = new FrameTupleAppender(ctx.getFrameSize());
+        this.tpComparator = comparators;
+        this.outBuffer = ctx.allocateFrame();
+        this.innerBuffer = ctx.allocateFrame();
+        this.appender.reset(outBuffer, true);
+        this.outBuffers = new ArrayList<ByteBuffer>();
+        this.memSize = memSize;
+        this.ctx = ctx;
+
+        FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+                this.getClass().getSimpleName() + this.toString());
+        runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+        runFileWriter.open();
+    }
+
+    public void cache(ByteBuffer buffer) throws HyracksDataException {
+        runFileWriter.nextFrame(buffer);
+        System.out.println(runFileWriter.getFileSize());
+    }
+
+    public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
+        if (outBuffers.size() < memSize - 3) {
+            createAndCopyFrame(outerBuffer);
+            return;
+        }
+        if (currentMemSize < memSize - 3) {
+            reloadFrame(outerBuffer);
+            return;
+        }
+        for (ByteBuffer outBuffer : outBuffers) {
+            runFileReader = runFileWriter.createReader();
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                blockJoin(outBuffer, innerBuffer, writer);
+            }
+            runFileReader.close();
+        }
+        currentMemSize = 0;
+        reloadFrame(outerBuffer);
+    }
+
+    private void createAndCopyFrame(ByteBuffer outerBuffer) {
+        ByteBuffer outerBufferCopy = ctx.allocateFrame();
+        FrameUtils.copy(outerBuffer, outerBufferCopy);
+        outBuffers.add(outerBufferCopy);
+        currentMemSize++;
+    }
+
+    private void reloadFrame(ByteBuffer outerBuffer) {
+        outBuffers.get(currentMemSize).clear();
+        FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+        currentMemSize++;
+    }
+
+    private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
+            throws HyracksDataException {
+        accessorOuter.reset(outerBuffer);
+        accessorInner.reset(innerBuffer);
+        int tupleCount0 = accessorOuter.getTupleCount();
+        int tupleCount1 = accessorInner.getTupleCount();
+
+        for (int i = 0; i < tupleCount0; ++i) {
+            for (int j = 0; j < tupleCount1; ++j) {
+                int c = compare(accessorOuter, i, accessorInner, j);
+                if (c == 0) {
+                    if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+                        flushFrame(outBuffer, writer);
+                        appender.reset(outBuffer, true);
+                        if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void closeCache() throws HyracksDataException {
+        if (runFileWriter != null) {
+            runFileWriter.close();
+        }
+    }
+
+    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+        for (ByteBuffer outBuffer : outBuffers) {
+            runFileReader = runFileWriter.createReader();
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                blockJoin(outBuffer, innerBuffer, writer);
+            }
+            runFileReader.close();
+        }
+        outBuffers.clear();
+        currentMemSize = 0;
+
+        if (appender.getTupleCount() > 0) {
+            flushFrame(outBuffer, writer);
+        }
+    }
+
+    private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+        writer.nextFrame(buffer);
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+    }
+
+    private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
+        if (c != 0) {
+            return c;
+        }
+        return 0;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
new file mode 100644
index 0000000..5b5ace1
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final String JOINER = "joiner";
+
+    private static final long serialVersionUID = 1L;
+    private final ITuplePairComparatorFactory comparatorFactory;
+    private final int memSize;
+
+    public NestedLoopJoinOperatorDescriptor(JobSpecification spec, ITuplePairComparatorFactory comparatorFactory,
+            RecordDescriptor recordDescriptor, int memSize) {
+        super(spec, 2, 1);
+        this.comparatorFactory = comparatorFactory;
+        this.recordDescriptors[0] = recordDescriptor;
+        this.memSize = memSize;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        JoinCacheActivityNode jc = new JoinCacheActivityNode(new ActivityId(getOperatorId(), 0));
+        NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(new ActivityId(getOperatorId(), 1));
+
+        builder.addActivity(jc);
+        builder.addSourceEdge(1, jc, 0);
+
+        builder.addActivity(nlj);
+        builder.addSourceEdge(0, nlj, 0);
+
+        builder.addTargetEdge(0, nlj, 0);
+        builder.addBlockingEdge(jc, nlj);
+    }
+
+    private class JoinCacheActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public JoinCacheActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
+
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private NestedLoopJoin joiner;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                    FrameUtils.copy(buffer, copyBuffer);
+                    copyBuffer.flip();
+                    joiner.cache(copyBuffer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    joiner.closeCache();
+                    env.set(JOINER, joiner);
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
+            };
+            return op;
+        }
+    }
+
+    private class NestedLoopJoinActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public NestedLoopJoinActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+
+            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private NestedLoopJoin joiner;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    joiner = (NestedLoopJoin) env.get(JOINER);
+                    writer.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    joiner.join(buffer, writer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    joiner.closeJoin(writer);
+                    writer.close();
+                    env.set(JOINER, null);
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    writer.flush();
+                }
+            };
+            return op;
+        }
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..ff39cc8
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public SplitOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int outputArity) {
+        super(spec, 1, outputArity);
+        for (int i = 0; i < outputArity; i++) {
+            recordDescriptors[i] = rDesc;
+        }
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, IOperatorEnvironment env,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputOperatorNodePushable() {
+            private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+
+            @Override
+            public void close() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.close();
+                }
+            }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    FrameUtils.flushFrame(bufferAccessor, writer);
+                }
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.open();
+                }
+            }
+
+            @Override
+            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                writers[index] = writer;
+            }
+        };
+    }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
new file mode 100644
index 0000000..f5ecde6
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.union;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
+    public UnionAllOperatorDescriptor(JobSpecification spec, int nInputs, RecordDescriptor recordDescriptor) {
+        super(spec, nInputs, 1);
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        UnionActivityNode uba = new UnionActivityNode(new ActivityId(getOperatorId(), 0));
+        builder.addActivity(uba);
+        for (int i = 0; i < inputArity; ++i) {
+            builder.addSourceEdge(i, uba, i);
+        }
+        builder.addTargetEdge(0, uba, 0);
+    }
+
+    private class UnionActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public UnionActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+                throws HyracksDataException {
+            RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            return new UnionOperator(ctx, inRecordDesc);
+        }
+    }
+
+    private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
+        private int nOpened;
+
+        private int nClosed;
+
+        public UnionOperator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc) {
+            nOpened = 0;
+            nClosed = 0;
+        }
+
+        @Override
+        public int getInputArity() {
+            return inputArity;
+        }
+
+        @Override
+        public IFrameWriter getInputFrameWriter(int index) {
+            return new IFrameWriter() {
+                @Override
+                public void open() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        if (++nOpened == 1) {
+                            writer.open();
+                        }
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.nextFrame(buffer);
+                    }
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.flush();
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        if (++nClosed == inputArity) {
+                            writer.close();
+                        }
+                    }
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
new file mode 100644
index 0000000..2b32142
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+
+public class SplitOperatorTest extends AbstractIntegrationTest {
+
+    public void compareFiles(String fileNameA, String fileNameB) throws IOException {
+        BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
+        BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
+
+        String lineA, lineB;
+        while ((lineA = fileA.readLine()) != null) {
+            lineB = fileB.readLine();
+            Assert.assertEquals(lineA, lineB);
+        }
+        Assert.assertNull(fileB.readLine());
+    }
+
+    @Test
+    public void test() throws Exception {
+        final int outputArity = 2;
+
+        JobSpecification spec = new JobSpecification();
+
+        String inputFileName = "data/words.txt";
+        File[] outputFile = new File[outputArity];
+        for (int i = 0; i < outputArity; i++) {
+            outputFile[i] = File.createTempFile("splitop", null);
+            outputFile[i].deleteOnExit();
+        }
+
+        FileSplit[] inputSplits = new FileSplit[] { new FileSplit(NC1_ID, inputFileName) };
+
+        String[] locations = new String[] { NC1_ID };
+
+        DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
+                new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
+        RecordDescriptor stringRec = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE, });
+
+        FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider(
+                inputSplits), stringParser, stringRec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations);
+
+        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, locations);
+
+        IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+        for (int i = 0; i < outputArity; i++) {
+            outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { new FileSplit(NC1_ID,
+                    outputFile[i].getAbsolutePath()) });
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
+        }
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+        for (int i = 0; i < outputArity; i++) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
+        }
+
+        for (int i = 0; i < outputArity; i++) {
+            spec.addRoot(outputOp[i]);
+        }
+        runTest(spec);
+
+        for (int i = 0; i < outputArity; i++) {
+            compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index c0d0211..75a4830 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -14,7 +14,9 @@
  */
 package edu.uci.ics.hyracks.tests.integration;
 
+import java.io.DataOutput;
 import java.io.File;
+import java.io.IOException;
 
 import org.junit.Test;
 
@@ -23,8 +25,11 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -51,6 +56,29 @@
 public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
     private static final boolean DEBUG = true;
 
+    static private class NoopNullWriterFactory implements INullWriterFactory {
+
+        private static final long serialVersionUID = 1L;
+        public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
+
+        private NoopNullWriterFactory() {
+        }
+
+        @Override
+        public INullWriter createNullWriter() {
+            return new INullWriter() {
+                @Override
+                public void writeNull(DataOutput out) throws HyracksDataException {
+                    try {
+                        out.writeShort(0);
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            };
+        }
+    }
+
     /*
      * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
      */
@@ -274,6 +302,251 @@
     }
 
     @Test
+    public void customerOrderCIDInMemoryHashLeftOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
+                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+                nullWriterFactories, 128);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+        //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+        // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDGraceHashLeftOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+                new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+                nullWriterFactories);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+        //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+        // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDHybridHashLeftOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+                new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+                nullWriterFactories);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+        //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+        // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
     public void customerOrderCIDJoinMulti() throws Exception {
         JobSpecification spec = new JobSpecification();
 
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
new file mode 100644
index 0000000..cab54f4
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -0,0 +1,335 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+
+public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest {
+    private static final boolean DEBUG = true;
+
+    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+        private static final long serialVersionUID = 1L;
+
+        private final IBinaryComparatorFactory bFactory;
+        private final int pos0;
+        private final int pos1;
+
+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
+            this.bFactory = bFactory;
+            this.pos0 = pos0;
+            this.pos1 = pos1;
+        }
+
+        @Override
+        public ITuplePairComparator createTuplePairComparator() {
+            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
+        }
+    }
+
+    static class JoinComparator implements ITuplePairComparator {
+
+        private final IBinaryComparator bComparator;
+        private final int field0;
+        private final int field1;
+
+        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
+            this.bComparator = bComparator;
+            this.field0 = field0;
+            this.field1 = field1;
+        }
+
+        @Override
+        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+            return 0;
+        }
+    }
+
+    /*
+     * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
+     * C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY
+     * INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT
+     * NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );
+     * TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,
+     * O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE
+     * DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY
+     * CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT
+     * NULL, O_COMMENT VARCHAR(79) NOT NULL );
+     */
+    @Test
+    public void customerOrderCIDJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 4);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDJoinMulti() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 5);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDJoinAutoExpand() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 6);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
new file mode 100644
index 0000000..b1089fe
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+
+public class UnionTest extends AbstractIntegrationTest {
+    @Test
+    public void union01() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+                new FileSplit(NC2_ID, new FileReference(new File("data/words.txt"))),
+                new FileSplit(NC1_ID, new FileReference(new File("data/words.txt"))) });
+
+        RecordDescriptor desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor csvScanner01 = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner01, NC2_ID, NC1_ID);
+
+        FileScanOperatorDescriptor csvScanner02 = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner02, NC2_ID, NC1_ID);
+
+        UnionAllOperatorDescriptor unionAll = new UnionAllOperatorDescriptor(spec, 2, desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, unionAll, NC2_ID, NC1_ID);
+
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, unionAll, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner02, 0, unionAll, 1);
+        spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
index 40cb7da..0d5dc8f 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
@@ -32,6 +32,8 @@
 
     public int getTokenLength();
 
+    public int getNumTokens();
+
     public void writeToken(DataOutput dos) throws IOException;
 
     public RecordDescriptor getTokenSchema();
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
index 73635f9..425436b 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
@@ -30,6 +30,7 @@
             new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
     private final char delimiter;
+    private final byte typeTag;
     private byte[] data;
     private int start;
     private int length;
@@ -38,8 +39,14 @@
     private int tokenStart;
     private int pos;
 
+    public DelimitedUTF8StringBinaryTokenizer(char delimiter, byte typeTag) {
+        this.delimiter = delimiter;
+        this.typeTag = typeTag;
+    }
+
     public DelimitedUTF8StringBinaryTokenizer(char delimiter) {
         this.delimiter = delimiter;
+        this.typeTag = -1;
     }
 
     @Override
@@ -88,6 +95,9 @@
 
     @Override
     public void writeToken(DataOutput dos) throws IOException {
+        if (typeTag > 0)
+            dos.write(typeTag);
+
         // WARNING: 2-byte length indicator is specific to UTF-8
         dos.writeShort((short) tokenLength);
         dos.write(data, tokenStart, tokenLength);
@@ -97,4 +107,10 @@
     public RecordDescriptor getTokenSchema() {
         return tokenSchema;
     }
+
+    // cannot be implemented for this tokenizer
+    @Override
+    public int getNumTokens() {
+        return -1;
+    }
 }
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
index e3e0be3..2e85db5 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
@@ -22,13 +22,20 @@
 
     private static final long serialVersionUID = 1L;
     private final char delimiter;
+    private final byte typeTag;
+
+    public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter, byte typeTag) {
+        this.delimiter = delimiter;
+        this.typeTag = typeTag;
+    }
 
     public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter) {
         this.delimiter = delimiter;
+        this.typeTag = -1;
     }
 
     @Override
     public IBinaryTokenizer createBinaryTokenizer() {
-        return new DelimitedUTF8StringBinaryTokenizer(delimiter);
+        return new DelimitedUTF8StringBinaryTokenizer(delimiter, typeTag);
     }
 }
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
index 54fc371..2cc0b6c 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
@@ -142,4 +142,9 @@
     public RecordDescriptor getTokenSchema() {
         return tokenSchema;
     }
-}
+
+    @Override
+    public int getNumTokens() {
+        return 0;
+    }
+}
\ No newline at end of file