Merged hyracks_aqua_changes@405:431 into trunk

git-svn-id: https://hyracks.googlecode.com/svn/trunk@432 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index a431939..0987bd2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -16,6 +16,7 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
 
@@ -25,4 +26,6 @@
     public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory);
 
     public void addJobLifecycleListener(IJobLifecycleListener jobLifecycleListener);
+    
+    public ICCContext getCCContext();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
new file mode 100644
index 0000000..266ebe9
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/ICCContext.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.context;
+
+import java.util.Map;
+import java.util.Set;
+
+public interface ICCContext {
+    public Map<String, Set<String>> getIPAddressNodeMap();
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
index 362306a..3dcf620 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -93,7 +94,8 @@
      * @param plan
      *            - Job Plan
      */
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan);
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx);
 
     /**
      * Translate this connector descriptor to JSON.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index e58ac99..9e8dbcb 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobPlan;
@@ -73,7 +74,8 @@
      * @param plan
      *            - Job Plan
      */
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan);
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx);
 
     /**
      * Translates this operator descriptor to JSON.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
new file mode 100644
index 0000000..7552c17
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriter.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface INullWriter {
+    public void writeNull(DataOutput out) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
new file mode 100644
index 0000000..6d9a744
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/INullWriterFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INullWriterFactory extends Serializable {
+    public INullWriter createNullWriter();
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
new file mode 100644
index 0000000..3251944
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparator.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePairComparator {
+
+    public int compare(IFrameTupleAccessor outerRef, int outerIndex, IFrameTupleAccessor innerRef, int innerIndex)
+            throws HyracksDataException;
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
new file mode 100644
index 0000000..26cb525
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/ITuplePairComparatorFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface ITuplePairComparatorFactory extends Serializable {
+
+    public ITuplePairComparator createTuplePairComparator();
+}
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index a4aa482..e5efe92 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -23,6 +23,7 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -33,9 +34,11 @@
 
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.IHyracksClientInterface;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.control.CCConfig;
 import edu.uci.ics.hyracks.api.control.IClusterController;
 import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NCConfig;
 import edu.uci.ics.hyracks.api.control.NodeParameters;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobFlag;
@@ -76,6 +79,8 @@
 
     private final Map<String, NodeControllerState> nodeRegistry;
 
+    private final Map<String, Set<String>> ipAddressNodeNameMap;
+
     private final Map<String, CCApplicationContext> applications;
 
     private final ServerContext serverCtx;
@@ -94,9 +99,12 @@
 
     private final Timer timer;
 
+    private final ICCContext ccContext;
+
     public ClusterControllerService(CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
         nodeRegistry = new LinkedHashMap<String, NodeControllerState>();
+        ipAddressNodeNameMap = new HashMap<String, Set<String>>();
         applications = new Hashtable<String, CCApplicationContext>();
         serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(
                 ClusterControllerService.class.getName()));
@@ -106,6 +114,12 @@
         jobQueue = new JobQueue();
         scheduler = new NaiveScheduler(this);
         this.timer = new Timer(true);
+        ccContext = new ICCContext() {
+            @Override
+            public Map<String, Set<String>> getIPAddressNodeMap() {
+                return ipAddressNodeNameMap;
+            }
+        };
     }
 
     @Override
@@ -153,6 +167,10 @@
         return nodeRegistry;
     }
 
+    public Map<String, Set<String>> getIPAddressNodeNameMap() {
+        return ipAddressNodeNameMap;
+    }
+
     public CCConfig getConfig() {
         return ccConfig;
     }
@@ -169,7 +187,8 @@
     @Override
     public NodeParameters registerNode(INodeController nodeController) throws Exception {
         String id = nodeController.getId();
-        NodeControllerState state = new NodeControllerState(nodeController);
+        NCConfig ncConfig = nodeController.getConfiguration();
+        NodeControllerState state = new NodeControllerState(nodeController, ncConfig);
         jobQueue.scheduleAndSync(new RegisterNodeEvent(this, id, state));
         nodeController.notifyRegistration(this);
         LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
@@ -242,7 +261,7 @@
             if (applications.containsKey(appName)) {
                 throw new HyracksException("Duplicate application with name: " + appName + " being created.");
             }
-            CCApplicationContext appCtx = new CCApplicationContext(serverCtx, appName);
+            CCApplicationContext appCtx = new CCApplicationContext(serverCtx, ccContext, appName);
             applications.put(appName, appCtx);
         }
     }
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index e69281b..40eb9ae 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -5,16 +5,20 @@
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.control.INodeController;
+import edu.uci.ics.hyracks.api.control.NCConfig;
 
 public class NodeControllerState {
     private final INodeController nodeController;
 
+    private final NCConfig ncConfig;
+
     private final Set<UUID> activeJobIds;
 
     private int lastHeartbeatDuration;
 
-    public NodeControllerState(INodeController nodeController) {
+    public NodeControllerState(INodeController nodeController, NCConfig ncConfig) {
         this.nodeController = nodeController;
+        this.ncConfig = ncConfig;
         activeJobIds = new HashSet<UUID>();
     }
 
@@ -34,6 +38,10 @@
         return nodeController;
     }
 
+    public NCConfig getNCConfig() {
+        return ncConfig;
+    }
+
     public Set<UUID> getActiveJobIds() {
         return activeJobIds;
     }
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
index 5ca0269..96e29d3 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/application/CCApplicationContext.java
@@ -8,6 +8,7 @@
 
 import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.application.ICCBootstrap;
+import edu.uci.ics.hyracks.api.context.ICCContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IJobLifecycleListener;
 import edu.uci.ics.hyracks.api.job.IJobSpecificationFactory;
@@ -17,12 +18,15 @@
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 
 public class CCApplicationContext extends ApplicationContext implements ICCApplicationContext {
+    private final ICCContext ccContext;
+
     private IJobSpecificationFactory jobSpecFactory;
 
     private List<IJobLifecycleListener> jobLifecycleListeners;
 
-    public CCApplicationContext(ServerContext serverCtx, String appName) throws IOException {
+    public CCApplicationContext(ServerContext serverCtx, ICCContext ccContext, String appName) throws IOException {
         super(serverCtx, appName);
+        this.ccContext = ccContext;
         jobSpecFactory = DeserializingJobSpecificationFactory.INSTANCE;
         jobLifecycleListeners = new ArrayList<IJobLifecycleListener>();
     }
@@ -33,6 +37,10 @@
         bootstrap.start();
     }
 
+    public ICCContext getCCContext() {
+        return ccContext;
+    }
+
     @Override
     public void setJobSpecificationFactory(IJobSpecificationFactory jobSpecFactory) {
         this.jobSpecFactory = jobSpecFactory;
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index 3b03d25..d7f244e 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -60,7 +60,7 @@
             throw new HyracksException("No application with id " + appName + " found");
         }
         JobSpecification spec = appCtx.createJobSpecification(jobId, jobSpec);
-        JobRun run = plan(jobId, spec, jobFlags);
+        JobRun run = plan(jobId, spec, appCtx, jobFlags);
         run.setStatus(JobStatus.INITIALIZED);
 
         ccs.getRunMap().put(jobId, run);
@@ -71,7 +71,8 @@
         return jobId;
     }
 
-    private JobRun plan(UUID jobId, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+    private JobRun plan(UUID jobId, JobSpecification jobSpec, final CCApplicationContext appCtx,
+            EnumSet<JobFlag> jobFlags) throws Exception {
         final JobPlanBuilder builder = new JobPlanBuilder();
         builder.init(appName, jobId, jobSpec, jobFlags);
         PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
@@ -92,13 +93,13 @@
         PlanUtils.visit(jobSpec, new IOperatorDescriptorVisitor() {
             @Override
             public void visit(IOperatorDescriptor op) {
-                op.contributeSchedulingConstraints(acceptor, plan);
+                op.contributeSchedulingConstraints(acceptor, plan, appCtx);
             }
         });
         PlanUtils.visit(jobSpec, new IConnectorDescriptorVisitor() {
             @Override
             public void visit(IConnectorDescriptor conn) {
-                conn.contributeSchedulingConstraints(acceptor, plan);
+                conn.contributeSchedulingConstraints(acceptor, plan, appCtx);
             }
         });
         contributedConstraints.addAll(jobSpec.getUserConstraints());
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
index 0754dc1..3df302f 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterNodeEvent.java
@@ -14,7 +14,9 @@
  */
 package edu.uci.ics.hyracks.control.cc.job.manager.events;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
 import edu.uci.ics.hyracks.control.cc.NodeControllerState;
@@ -38,5 +40,13 @@
             throw new Exception("Node with this name already registered.");
         }
         nodeMap.put(nodeId, state);
+        Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
+        String ipAddress = state.getNCConfig().dataIPAddress;
+        Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
+        if (nodes == null) {
+            nodes = new HashSet<String>();
+            ipAddressNodeNameMap.put(ipAddress, nodes);
+        }
+        nodes.add(nodeId);
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
index 38fb7ae..dca0dee 100644
--- a/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
+++ b/hyracks/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RemoveDeadNodesEvent.java
@@ -44,6 +44,7 @@
                 LOGGER.info(e.getKey() + " considered dead");
             }
         }
+        Map<String, Set<String>> ipAddressNodeNameMap = ccs.getIPAddressNodeNameMap();
         for (String deadNode : deadNodes) {
             NodeControllerState state = nodeMap.remove(deadNode);
             for (final UUID jid : state.getActiveJobIds()) {
@@ -52,6 +53,13 @@
                 LOGGER.info("Aborting: " + jid);
                 ccs.getJobQueue().schedule(new JobAbortEvent(ccs, jid, lastAttempt));
             }
+            String ipAddress = state.getNCConfig().dataIPAddress;
+            Set<String> ipNodes = ipAddressNodeNameMap.get(ipAddress);
+            if (ipNodes != null) {
+                if (ipNodes.remove(deadNode) && ipNodes.isEmpty()) {
+                    ipAddressNodeNameMap.remove(ipAddress);
+                }
+            }
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 6cc7450..413243e 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -16,6 +16,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.Hashtable;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
@@ -65,9 +66,9 @@
         this.appCtx = appCtx;
         this.jobId = jobId;
         this.attempt = attempt;
-        stageletMap = new HashMap<UUID, Stagelet>();
-        envMap = new HashMap<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
-        counterMap = new HashMap<String, Counter>();
+        stageletMap = new Hashtable<UUID, Stagelet>();
+        envMap = new Hashtable<OperatorDescriptorId, Map<Integer, IOperatorEnvironment>>();
+        counterMap = new Hashtable<String, Counter>();
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new ManagedWorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
     }
@@ -78,8 +79,10 @@
     }
 
     public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
-        if (!envMap.containsKey(hod.getOperatorId())) {
-            envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+        synchronized (envMap) {
+            if (!envMap.containsKey(hod.getOperatorId())) {
+                envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+            }
         }
         Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
         if (!opEnvMap.containsKey(partition)) {
@@ -118,7 +121,7 @@
         return nodeController.getExecutor();
     }
 
-    public synchronized void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
+    public void notifyStageletComplete(UUID stageId, int attempt, StageletProfile stats) throws Exception {
         stageletMap.remove(stageId);
         nodeController.notifyStageComplete(jobId, stageId, attempt, stats);
     }
@@ -132,15 +135,19 @@
         return nodeController;
     }
 
-    public synchronized void dumpProfile(JobletProfile jProfile) {
-        Map<String, Long> counters = jProfile.getCounters();
-        for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
-            counters.put(e.getKey(), e.getValue().get());
+    public void dumpProfile(JobletProfile jProfile) {
+        synchronized (counterMap) {
+            Map<String, Long> counters = jProfile.getCounters();
+            for (Map.Entry<String, Counter> e : counterMap.entrySet()) {
+                counters.put(e.getKey(), e.getValue().get());
+            }
         }
-        for (Stagelet si : stageletMap.values()) {
-            StageletProfile sProfile = new StageletProfile(si.getStageId());
-            si.dumpProfile(sProfile);
-            jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+        synchronized (stageletMap) {
+            for (Stagelet si : stageletMap.values()) {
+                StageletProfile sProfile = new StageletProfile(si.getStageId());
+                si.dumpProfile(sProfile);
+                jProfile.getStageletProfiles().put(si.getStageId(), sProfile);
+            }
         }
     }
 
@@ -193,12 +200,14 @@
     }
 
     @Override
-    public synchronized ICounter getCounter(String name, boolean create) {
-        Counter counter = counterMap.get(name);
-        if (counter == null && create) {
-            counter = new Counter(name);
-            counterMap.put(name, counter);
+    public ICounter getCounter(String name, boolean create) {
+        synchronized (counterMap) {
+            Counter counter = counterMap.get(name);
+            if (counter == null && create) {
+                counter = new Counter(name);
+                counterMap.put(name, counter);
+            }
+            return counter;
         }
-        return counter;
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2702386..293e626 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -129,7 +129,7 @@
         }
         nodeCapability = computeNodeCapability();
         connectionManager = new ConnectionManager(ctx, getIpAddress(ncConfig));
-        jobletMap = new HashMap<UUID, Joblet>();
+        jobletMap = new Hashtable<UUID, Joblet>();
         timer = new Timer(true);
         serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
                 NodeControllerService.class.getName()), id));
@@ -251,13 +251,11 @@
                 for (int i : tasks.get(hanId)) {
                     IOperatorNodePushable hon = han.createPushRuntime(stagelet, joblet.getEnvironment(op, i), rdp, i,
                             opNumPartitions.get(op.getOperatorId()));
-                    OperatorRunnable or = new OperatorRunnable(stagelet, hon);
+                    OperatorRunnable or = new OperatorRunnable(stagelet, hon, inputs == null ? 0 : inputs.size(),
+                            executor);
                     stagelet.setOperator(op.getOperatorId(), i, or);
                     if (inputs != null) {
                         for (int j = 0; j < inputs.size(); ++j) {
-                            if (j >= 1) {
-                                throw new IllegalStateException();
-                            }
                             IConnectorDescriptor conn = inputs.get(j);
                             OperatorDescriptorId producerOpId = plan.getJobSpecification().getProducer(conn)
                                     .getOperatorId();
@@ -276,7 +274,7 @@
                             portMap.put(piId, endpoint);
                             IFrameReader reader = createReader(stagelet, conn, drlf, i, plan, stagelet,
                                     opNumPartitions.get(producerOpId), opNumPartitions.get(consumerOpId));
-                            or.setFrameReader(reader);
+                            or.setFrameReader(j, reader);
                         }
                     }
                     honMap.put(new OperatorInstanceId(op.getOperatorId(), i), or);
@@ -438,19 +436,20 @@
         si.setEndpointList(null);
     }
 
-    private synchronized Joblet getLocalJoblet(UUID jobId) throws Exception {
+    private Joblet getLocalJoblet(UUID jobId) throws Exception {
         Joblet ji = jobletMap.get(jobId);
         return ji;
     }
 
-    private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx)
-            throws Exception {
-        Joblet ji = jobletMap.get(jobId);
-        if (ji == null || ji.getAttempt() != attempt) {
-            ji = new Joblet(this, jobId, attempt, appCtx);
-            jobletMap.put(jobId, ji);
+    private Joblet getOrCreateLocalJoblet(UUID jobId, int attempt, INCApplicationContext appCtx) throws Exception {
+        synchronized (jobletMap) {
+            Joblet ji = jobletMap.get(jobId);
+            if (ji == null || ji.getAttempt() != attempt) {
+                ji = new Joblet(this, jobId, attempt, appCtx);
+                jobletMap.put(jobId, ji);
+            }
+            return ji;
         }
-        return ji;
     }
 
     public Executor getExecutor() {
@@ -458,7 +457,7 @@
     }
 
     @Override
-    public synchronized void cleanUpJob(UUID jobId) throws Exception {
+    public void cleanUpJob(UUID jobId) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Cleaning up after job: " + jobId);
         }
@@ -563,7 +562,7 @@
     }
 
     @Override
-    public synchronized void abortJoblet(UUID jobId, int attempt) throws Exception {
+    public void abortJoblet(UUID jobId, int attempt) throws Exception {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Aborting Job: " + jobId + ":" + attempt);
         }
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
index eb283c2..68b2cad 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Stagelet.java
@@ -146,9 +146,13 @@
         });
     }
 
-    protected synchronized void notifyOperatorCompletion(OperatorInstanceId opIId) {
-        pendingOperators.remove(opIId);
-        if (pendingOperators.isEmpty()) {
+    protected void notifyOperatorCompletion(OperatorInstanceId opIId) {
+        boolean done = false;
+        synchronized (pendingOperators) {
+            pendingOperators.remove(opIId);
+            done = pendingOperators.isEmpty();
+        }
+        if (done) {
             try {
                 StageletProfile sProfile = new StageletProfile(stageId);
                 dumpProfile(sProfile);
@@ -227,7 +231,7 @@
     }
 
     @Override
-    public ICounter getCounter(String name, boolean create) {
+    public synchronized ICounter getCounter(String name, boolean create) {
         Counter counter = counterMap.get(name);
         if (counter == null && create) {
             counter = new Counter(name);
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
index eca7fd0..8edd992 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/OperatorRunnable.java
@@ -15,6 +15,8 @@
 package edu.uci.ics.hyracks.control.nc.runtime;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
 
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -24,22 +26,27 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class OperatorRunnable implements Runnable {
+    private final IHyracksStageletContext ctx;
     private final IOperatorNodePushable opNode;
-    private IFrameReader reader;
-    private ByteBuffer buffer;
+    private final int nInputs;
+    private final Executor executor;
+    private IFrameReader[] readers;
     private volatile boolean abort;
 
-    public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode) {
+    public OperatorRunnable(IHyracksStageletContext ctx, IOperatorNodePushable opNode, int nInputs, Executor executor) {
+        this.ctx = ctx;
         this.opNode = opNode;
-        buffer = ctx.allocateFrame();
+        this.nInputs = nInputs;
+        this.executor = executor;
+        readers = new IFrameReader[nInputs];
     }
 
     public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
         opNode.setOutputFrameWriter(index, writer, recordDesc);
     }
 
-    public void setFrameReader(IFrameReader reader) {
-        this.reader = reader;
+    public void setFrameReader(int inputIdx, IFrameReader reader) {
+        this.readers[inputIdx] = reader;
     }
 
     public void abort() {
@@ -50,20 +57,28 @@
     public void run() {
         try {
             opNode.initialize();
-            if (reader != null) {
-                IFrameWriter writer = opNode.getInputFrameWriter(0);
-                writer.open();
-                reader.open();
-                while (readFrame()) {
-                    if (abort) {
-                        break;
-                    }
-                    buffer.flip();
-                    writer.nextFrame(buffer);
-                    buffer.compact();
+            if (nInputs > 0) {
+                final Semaphore sem = new Semaphore(nInputs - 1);
+                for (int i = 1; i < nInputs; ++i) {
+                    final IFrameReader reader = readers[i];
+                    final IFrameWriter writer = opNode.getInputFrameWriter(i);
+                    sem.acquire();
+                    executor.execute(new Runnable() {
+                        public void run() {
+                            try {
+                                pushFrames(reader, writer);
+                            } catch (HyracksDataException e) {
+                            } finally {
+                                sem.release();
+                            }
+                        }
+                    });
                 }
-                reader.close();
-                writer.close();
+                try {
+                    pushFrames(readers[0], opNode.getInputFrameWriter(0));
+                } finally {
+                    sem.acquire(nInputs - 1);
+                }
             }
             opNode.deinitialize();
         } catch (Exception e) {
@@ -71,8 +86,20 @@
         }
     }
 
-    protected boolean readFrame() throws HyracksDataException {
-        return reader.nextFrame(buffer);
+    private void pushFrames(IFrameReader reader, IFrameWriter writer) throws HyracksDataException {
+        ByteBuffer buffer = ctx.allocateFrame();
+        writer.open();
+        reader.open();
+        while (reader.nextFrame(buffer)) {
+            if (abort) {
+                break;
+            }
+            buffer.flip();
+            writer.nextFrame(buffer);
+            buffer.compact();
+        }
+        reader.close();
+        writer.close();
     }
 
     @Override
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 4ec1b6f..7647e50 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -118,30 +118,32 @@
         return false;
     }
     
-    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots, byte[] bytes, int offset,
-            int length) {
+    public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
+            int dataLen1) {
         int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
         int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
         int length0 = endOffset0 - startOffset0;
 
-        if (tupleDataEndOffset + length0 + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+        int slotsLen1 = fieldSlots1.length * 4;
+        int length1 = slotsLen1 + dataLen1;
+        
+        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
             ByteBuffer src0 = accessor0.getBuffer();
             int slotsLen0 = accessor0.getFieldSlotsLength();
             int dataLen0 = length0 - slotsLen0;
             // Copy slots from accessor0 verbatim
             System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
-            // Copy slots from fieldSlots with the following transformation:
-            // newSlotIdx = oldSlotIdx + dataLen0
-            for (int i = 0; i < fieldSlots.length; ++i) {
-                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots[i] + dataLen0));
+            // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+            for (int i = 0; i < fieldSlots1.length; ++i) {
+                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
             }
             // Copy data0
             System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
-                    + fieldSlots.length * 4, dataLen0);
-            // Copy bytes
-            System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots.length * 4
-                    + dataLen0, length);
-            tupleDataEndOffset += (length0 + fieldSlots.length * 4 + length);
+                    + slotsLen1, dataLen0);
+            // Copy bytes1
+            System.arraycopy(bytes1, offset1, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4
+                    + dataLen0, dataLen1);
+            tupleDataEndOffset += (length0 + length1);
             buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
             buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
@@ -150,6 +152,41 @@
         return false;
     }
 
+    public boolean appendConcat(int[] fieldSlots0, byte[] bytes0, int offset0, int dataLen0, IFrameTupleAccessor accessor1,
+            int tIndex1) {
+        int slotsLen0 = fieldSlots0.length * 4;
+        int length0 = slotsLen0 + dataLen0;
+        
+        int startOffset1 = accessor1.getTupleStartOffset(tIndex1);
+        int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
+        int length1 = endOffset1 - startOffset1;
+        
+        if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+            ByteBuffer src1 = accessor1.getBuffer();
+            int slotsLen1 = accessor1.getFieldSlotsLength();
+            int dataLen1 = length1 - slotsLen1;
+            // Copy fieldSlots0 verbatim
+            for (int i = 0; i < fieldSlots0.length; ++i) {
+                buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots0[i]);
+            }
+            // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+            for (int i = 0; i < slotsLen1 / 4; ++i) {
+                buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, src1.getInt(startOffset1 + i * 4) + dataLen0);
+            }
+            // Copy bytes0
+            System.arraycopy(bytes0, offset0, buffer.array(), tupleDataEndOffset + slotsLen0 + slotsLen1, 
+                    dataLen0);
+            // Copy data1
+            System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0 
+                    + slotsLen1 + dataLen0, dataLen1);
+            tupleDataEndOffset += (length0 + length1);
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+            ++tupleCount;
+            buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+            return true;
+        }
+        return false;
+    }
     public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) {
         int fTargetSlotsLength = fields.length * 4;
         int length = fTargetSlotsLength;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 0eebd93..e4613dd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -50,7 +51,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx) {
         // do nothing
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 7e97975..4453412 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -19,6 +19,7 @@
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -68,7 +69,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx) {
         // do nothing
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index a038a40..6ee9ab37 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.connectors;
 
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
 import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
 import edu.uci.ics.hyracks.api.comm.IFrameReader;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -52,7 +53,8 @@
     }
 
     @Override
-    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+    public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+            ICCApplicationContext appCtx) {
         JobSpecification jobSpec = plan.getJobSpecification();
         IOperatorDescriptor consumer = jobSpec.getConsumer(this);
         IOperatorDescriptor producer = jobSpec.getProducer(this);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index bff101e..1a69952 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -44,8 +46,8 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final String SMALLRELATION = "RelR";
-    private static final String LARGERELATION = "RelS";
+    private static final String RELATION0 = "Rel0";
+    private static final String RELATION1 = "Rel1";
 
     private static final long serialVersionUID = 1L;
     private final int[] keys0;
@@ -56,6 +58,8 @@
     private final double factor;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
 
     public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
@@ -69,24 +73,44 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.recordsPerFrame = recordsPerFrame;
+        this.factor = factor;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
     }
 
     @Override
     public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
-        HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
+        HashPartitionActivityNode part0 = new HashPartitionActivityNode(RELATION0, keys0, 0);
+        HashPartitionActivityNode part1 = new HashPartitionActivityNode(RELATION1, keys1, 1);
         JoinActivityNode join = new JoinActivityNode();
 
-        builder.addTask(rpart);
-        builder.addSourceEdge(0, rpart, 0);
+        builder.addTask(part0);
+        builder.addSourceEdge(0, part0, 0);
 
-        builder.addTask(spart);
-        builder.addSourceEdge(1, spart, 0);
+        builder.addTask(part1);
+        builder.addSourceEdge(1, part1, 0);
 
         builder.addTask(join);
-        builder.addBlockingEdge(rpart, spart);
-        builder.addBlockingEdge(spart, join);
+        builder.addBlockingEdge(part0, part1);
+        builder.addBlockingEdge(part1, join);
 
         builder.addTargetEdge(0, join, 0);
     }
@@ -217,18 +241,24 @@
             }
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
-                private RunFileWriter[] rWriters;
-                private RunFileWriter[] sWriters;
+                private RunFileWriter[] buildWriters;
+                private RunFileWriter[] probeWriters;
                 private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
 
                 @Override
                 public void initialize() throws HyracksDataException {
-                    rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
-                    sWriters = (RunFileWriter[]) env.get(LARGERELATION);
+                    buildWriters = (RunFileWriter[]) env.get(RELATION1);
+                    probeWriters = (RunFileWriter[]) env.get(RELATION0);
 
                     ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
                             new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -241,34 +271,36 @@
                     // buffer
                     int tableSize = (int) (numPartitions * recordsPerFrame * factor);
                     for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
-                        RunFileWriter rWriter = rWriters[partitionid];
-                        RunFileWriter sWriter = sWriters[partitionid];
-                        if (rWriter == null || sWriter == null) {
+                        RunFileWriter buildWriter = buildWriters[partitionid];
+                        RunFileWriter probeWriter = probeWriters[partitionid];
+                        if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                             continue;
                         }
                         joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                                 hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                                new FrameTuplePairComparator(keys0, keys1, comparators));
+                                new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
 
                         // build
-                        RunFileReader rReader = rWriter.createReader();
-                        rReader.open();
-                        while (rReader.nextFrame(buffer)) {
-                            ByteBuffer copyBuffer = ctx.allocateFrame();
-                            FrameUtils.copy(buffer, copyBuffer);
-                            joiner.build(copyBuffer);
-                            buffer.clear();
+                        if (buildWriter != null) {
+                            RunFileReader buildReader = buildWriter.createReader();
+                            buildReader.open();
+                            while (buildReader.nextFrame(buffer)) {
+                                ByteBuffer copyBuffer = ctx.allocateFrame();
+                                FrameUtils.copy(buffer, copyBuffer);
+                                joiner.build(copyBuffer);
+                                buffer.clear();
+                            }
+                            buildReader.close();
                         }
-                        rReader.close();
 
                         // probe
-                        RunFileReader sReader = sWriter.createReader();
-                        sReader.open();
-                        while (sReader.nextFrame(buffer)) {
+                        RunFileReader probeReader = probeWriter.createReader();
+                        probeReader.open();
+                        while (probeReader.nextFrame(buffer)) {
                             joiner.join(buffer, writer);
                             buffer.clear();
                         }
-                        sReader.close();
+                        probeReader.close();
                         joiner.closeJoin(writer);
                     }
                     writer.close();
@@ -276,8 +308,8 @@
 
                 @Override
                 public void deinitialize() throws HyracksDataException {
-                    env.set(LARGERELATION, null);
-                    env.set(SMALLRELATION, null);
+                    env.set(RELATION1, null);
+                    env.set(RELATION0, null);
                 }
             };
             return op;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index d7c1086..516bb88 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,19 +48,21 @@
 
 public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final String JOINER0 = "joiner0";
-    private static final String SMALLRELATION = "RelR";
-    private static final String LARGERELATION = "RelS";
+    private static final String BUILDRELATION = "BuildRel";
+    private static final String PROBERELATION = "ProbeRel";
     private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
     private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
     private final int memsize;
     private static final long serialVersionUID = 1L;
     private final int inputsize0;
     private final double factor;
+    private final int recordsPerFrame;
     private final int[] keys0;
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
-    private final int recordsPerFrame;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
 
     /**
      * @param spec
@@ -88,19 +92,39 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    public HybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+            double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+            IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+        super(spec, 2, 1);
+        this.memsize = memsize;
+        this.inputsize0 = inputsize0;
+        this.factor = factor;
+        this.recordsPerFrame = recordsPerFrame;
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
     }
 
     @Override
     public void contributeTaskGraph(IActivityGraphBuilder builder) {
-        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(SMALLRELATION);
-        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(LARGERELATION);
+        BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(BUILDRELATION);
+        PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(PROBERELATION);
 
         builder.addTask(phase1);
-        builder.addSourceEdge(0, phase1, 0);
+        builder.addSourceEdge(1, phase1, 0);
 
         builder.addTask(phase2);
-        builder.addSourceEdge(1, phase2, 0);
+        builder.addSourceEdge(0, phase2, 0);
 
         builder.addBlockingEdge(phase1, phase2);
 
@@ -127,12 +151,18 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
-                private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
-                ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
-                        .createPartitioner();
+                private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+                private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
+                        hashFunctionFactories).createPartitioner();
                 private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
                 private ByteBuffer[] bufferForPartitions;
@@ -148,8 +178,8 @@
 
                     for (int i = 0; i < B; i++) {
                         ByteBuffer buf = bufferForPartitions[i];
-                        accessor0.reset(buf);
-                        if (accessor0.getTupleCount() > 0) {
+                        accessorBuild.reset(buf);
+                        if (accessorBuild.getTupleCount() > 0) {
                             write(i, buf);
                         }
                         closeWriter(i);
@@ -165,17 +195,17 @@
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
 
                     if (memoryForHashtable != memsize - 2) {
-                        accessor0.reset(buffer);
-                        int tCount = accessor0.getTupleCount();
+                        accessorBuild.reset(buffer);
+                        int tCount = accessorBuild.getTupleCount();
                         for (int i = 0; i < tCount; ++i) {
                             int entry = -1;
                             if (memoryForHashtable == 0) {
-                                entry = hpc0.partition(accessor0, i, B);
+                                entry = hpcBuild.partition(accessorBuild, i, B);
                                 boolean newBuffer = false;
                                 ByteBuffer bufBi = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(bufBi, newBuffer);
-                                    if (appender.append(accessor0, i)) {
+                                    if (appender.append(accessorBuild, i)) {
                                         break;
                                     } else {
                                         write(entry, bufBi);
@@ -184,15 +214,16 @@
                                     }
                                 }
                             } else {
-                                entry = hpc0.partition(accessor0, i, (int) (inputsize0 * factor / nPartitions));
+                                entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
                                 if (entry < memoryForHashtable) {
                                     while (true) {
-                                        if (!ftappender.append(accessor0, i)) {
+                                        if (!ftappender.append(accessorBuild, i)) {
                                             build(inBuffer);
 
                                             ftappender.reset(inBuffer, true);
-                                        } else
+                                        } else {
                                             break;
+                                        }
                                     }
                                 } else {
                                     entry %= B;
@@ -200,7 +231,7 @@
                                     ByteBuffer bufBi = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(bufBi, newBuffer);
-                                        if (appender.append(accessor0, i)) {
+                                        if (appender.append(accessorBuild, i)) {
                                             break;
                                         } else {
                                             write(entry, bufBi);
@@ -255,7 +286,7 @@
                     int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
                     joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                             hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators));
+                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
                     bufferForPartitions = new ByteBuffer[B];
                     fWriters = new RunFileWriter[B];
                     for (int i = 0; i < B; i++) {
@@ -298,11 +329,11 @@
 
     private class PartitionAndJoinActivityNode extends AbstractActivityNode {
         private static final long serialVersionUID = 1L;
-        private String largeRelation;
+        private String relationName;
 
         public PartitionAndJoinActivityNode(String relationName) {
             super();
-            this.largeRelation = relationName;
+            this.relationName = relationName;
         }
 
         @Override
@@ -315,22 +346,28 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private InMemoryHashJoin joiner0;
-                private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
-                private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+                private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+                private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
                         hashFunctionFactories);
-                private ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+                private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
                         hashFunctionFactories);
-                ITuplePartitionComputer hpc1 = hpcf1.createPartitioner();
+                private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
 
                 private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
                 private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
                 private final ByteBuffer inBuffer = ctx.allocateFrame();
                 private final ByteBuffer outBuffer = ctx.allocateFrame();
-                private RunFileWriter[] rWriters;
-                private RunFileWriter[] sWriters;
+                private RunFileWriter[] buildWriters;
+                private RunFileWriter[] probeWriters;
                 private ByteBuffer[] bufferForPartitions;
                 private int B;
                 private int memoryForHashtable;
@@ -339,10 +376,10 @@
                 public void open() throws HyracksDataException {
                     joiner0 = (InMemoryHashJoin) env.get(JOINER0);
                     writer.open();
-                    rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
+                    buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
                     B = (Integer) env.get(NUM_PARTITION);
                     memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
-                    sWriters = new RunFileWriter[B];
+                    probeWriters = new RunFileWriter[B];
                     bufferForPartitions = new ByteBuffer[B];
                     for (int i = 0; i < B; i++) {
                         bufferForPartitions[i] = ctx.allocateFrame();
@@ -354,18 +391,18 @@
                 @Override
                 public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                     if (memoryForHashtable != memsize - 2) {
-                        accessor1.reset(buffer);
-                        int tupleCount1 = accessor1.getTupleCount();
-                        for (int i = 0; i < tupleCount1; ++i) {
+                        accessorProbe.reset(buffer);
+                        int tupleCount0 = accessorProbe.getTupleCount();
+                        for (int i = 0; i < tupleCount0; ++i) {
 
                             int entry = -1;
                             if (memoryForHashtable == 0) {
-                                entry = hpc1.partition(accessor1, i, B);
+                                entry = hpcProbe.partition(accessorProbe, i, B);
                                 boolean newBuffer = false;
                                 ByteBuffer outbuf = bufferForPartitions[entry];
                                 while (true) {
                                     appender.reset(outbuf, newBuffer);
-                                    if (appender.append(accessor1, i)) {
+                                    if (appender.append(accessorProbe, i)) {
                                         break;
                                     } else {
                                         write(entry, outbuf);
@@ -374,10 +411,10 @@
                                     }
                                 }
                             } else {
-                                entry = hpc1.partition(accessor1, i, (int) (inputsize0 * factor / nPartitions));
+                                entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
                                 if (entry < memoryForHashtable) {
                                     while (true) {
-                                        if (!ftap.append(accessor1, i)) {
+                                        if (!ftap.append(accessorProbe, i)) {
                                             joiner0.join(inBuffer, writer);
                                             ftap.reset(inBuffer, true);
                                         } else
@@ -390,7 +427,7 @@
                                     ByteBuffer outbuf = bufferForPartitions[entry];
                                     while (true) {
                                         appender.reset(outbuf, newBuffer);
-                                        if (appender.append(accessor1, i)) {
+                                        if (appender.append(accessorProbe, i)) {
                                             break;
                                         } else {
                                             write(entry, outbuf);
@@ -415,8 +452,8 @@
                     if (memoryForHashtable != memsize - 2) {
                         for (int i = 0; i < B; i++) {
                             ByteBuffer buf = bufferForPartitions[i];
-                            accessor1.reset(buf);
-                            if (accessor1.getTupleCount() > 0) {
+                            accessorProbe.reset(buf);
+                            if (accessorProbe.getTupleCount() > 0) {
                                 write(i, buf);
                             }
                             closeWriter(i);
@@ -430,40 +467,43 @@
                             tableSize = (int) (memsize * recordsPerFrame * factor);
                         }
                         for (int partitionid = 0; partitionid < B; partitionid++) {
-                            RunFileWriter rWriter = rWriters[partitionid];
-                            RunFileWriter sWriter = sWriters[partitionid];
-                            if (rWriter == null || sWriter == null) {
+                            RunFileWriter buildWriter = buildWriters[partitionid];
+                            RunFileWriter probeWriter = probeWriters[partitionid];
+                            if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
                                 continue;
                             }
 
                             InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
-                                    hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators));
+                                    hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+                                    nullWriters1);
 
-                            RunFileReader rReader = rWriter.createReader();
-                            rReader.open();
-                            while (rReader.nextFrame(inBuffer)) {
-                                ByteBuffer copyBuffer = ctx.allocateFrame();
-                                FrameUtils.copy(inBuffer, copyBuffer);
-                                joiner.build(copyBuffer);
-                                inBuffer.clear();
+                            if (buildWriter != null) {
+                                RunFileReader buildReader = buildWriter.createReader();
+                                buildReader.open();
+                                while (buildReader.nextFrame(inBuffer)) {
+                                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                                    FrameUtils.copy(inBuffer, copyBuffer);
+                                    joiner.build(copyBuffer);
+                                    inBuffer.clear();
+                                }
+                                buildReader.close();
                             }
-                            rReader.close();
 
                             // probe
-                            RunFileReader sReader = sWriter.createReader();
-                            sReader.open();
-                            while (sReader.nextFrame(inBuffer)) {
+                            RunFileReader probeReader = probeWriter.createReader();
+                            probeReader.open();
+                            while (probeReader.nextFrame(inBuffer)) {
                                 joiner.join(inBuffer, writer);
                                 inBuffer.clear();
                             }
-                            sReader.close();
+                            probeReader.close();
                             joiner.closeJoin(writer);
                         }
                     }
                     writer.close();
-                    env.set(LARGERELATION, null);
-                    env.set(SMALLRELATION, null);
+                    env.set(PROBERELATION, null);
+                    env.set(BUILDRELATION, null);
                     env.set(JOINER0, null);
                     env.set(MEM_HASHTABLE, null);
                     env.set(NUM_PARTITION, null);
@@ -475,19 +515,19 @@
                 }
 
                 private void closeWriter(int i) throws HyracksDataException {
-                    RunFileWriter writer = sWriters[i];
+                    RunFileWriter writer = probeWriters[i];
                     if (writer != null) {
                         writer.close();
                     }
                 }
 
                 private void write(int i, ByteBuffer head) throws HyracksDataException {
-                    RunFileWriter writer = sWriters[i];
+                    RunFileWriter writer = probeWriters[i];
                     if (writer == null) {
-                        FileReference file = ctx.createWorkspaceFile(largeRelation);
+                        FileReference file = ctx.createWorkspaceFile(relationName);
                         writer = new RunFileWriter(file, ctx.getIOManager());
                         writer.open();
-                        sWriters[i] = writer;
+                        probeWriters[i] = writer;
                     }
                     writer.nextFrame(head);
                 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 4d98c6a..94a9501 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.dataflow.std.join;
 
+import java.io.DataOutput;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -21,8 +22,10 @@
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -30,36 +33,51 @@
 public class InMemoryHashJoin {
     private final Link[] table;
     private final List<ByteBuffer> buffers;
-    private final FrameTupleAccessor accessor0;
-    private final ITuplePartitionComputer tpc0;
-    private final FrameTupleAccessor accessor1;
-    private final ITuplePartitionComputer tpc1;
+    private final FrameTupleAccessor accessorBuild;
+    private final ITuplePartitionComputer tpcBuild;
+    private final FrameTupleAccessor accessorProbe;
+    private final ITuplePartitionComputer tpcProbe;
     private final FrameTupleAppender appender;
     private final FrameTuplePairComparator tpComparator;
     private final ByteBuffer outBuffer;
-
+    private final boolean isLeftOuter;
+    private final ArrayTupleBuilder nullTupleBuild;
+    
     public InMemoryHashJoin(IHyracksStageletContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
-            FrameTuplePairComparator comparator) {
+            FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+            throws HyracksDataException {
         table = new Link[tableSize];
         buffers = new ArrayList<ByteBuffer>();
-        this.accessor0 = accessor0;
-        this.tpc0 = tpc0;
-        this.accessor1 = accessor1;
-        this.tpc1 = tpc1;
+        this.accessorBuild = accessor1;
+        this.tpcBuild = tpc1;
+        this.accessorProbe = accessor0;
+        this.tpcProbe = tpc0;
         appender = new FrameTupleAppender(ctx.getFrameSize());
         tpComparator = comparator;
         outBuffer = ctx.allocateFrame();
         appender.reset(outBuffer, true);
+        this.isLeftOuter = isLeftOuter;        
+        if (isLeftOuter) {
+            int fieldCountOuter = accessor1.getFieldCount();
+            nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
+            DataOutput out = nullTupleBuild.getDataOutput();
+            for (int i = 0; i < fieldCountOuter; i++) {
+                nullWriters1[i].writeNull(out);
+                nullTupleBuild.addFieldEndOffset();
+            }
+        } else {
+            nullTupleBuild = null;
+        }
     }
 
     public void build(ByteBuffer buffer) throws HyracksDataException {
         buffers.add(buffer);
         int bIndex = buffers.size() - 1;
-        accessor0.reset(buffer);
-        int tCount = accessor0.getTupleCount();
+        accessorBuild.reset(buffer);
+        int tCount = accessorBuild.getTupleCount();
         for (int i = 0; i < tCount; ++i) {
-            int entry = tpc0.partition(accessor0, i, table.length);
+            int entry = tpcBuild.partition(accessorBuild, i, table.length);
             long tPointer = (((long) bIndex) << 32) + i;
             Link link = table[entry];
             if (link == null) {
@@ -70,29 +88,41 @@
     }
 
     public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-        accessor1.reset(buffer);
-        int tupleCount1 = accessor1.getTupleCount();
-        for (int i = 0; i < tupleCount1; ++i) {
-            int entry = tpc1.partition(accessor1, i, table.length);
+        accessorProbe.reset(buffer);
+        int tupleCount0 = accessorProbe.getTupleCount();
+        for (int i = 0; i < tupleCount0; ++i) {
+            int entry = tpcProbe.partition(accessorProbe, i, table.length);
             Link link = table[entry];
+            boolean matchFound = false;
             if (link != null) {
                 for (int j = 0; j < link.size; ++j) {
                     long pointer = link.pointers[j];
                     int bIndex = (int) ((pointer >> 32) & 0xffffffff);
                     int tIndex = (int) (pointer & 0xffffffff);
-                    accessor0.reset(buffers.get(bIndex));
-                    int c = tpComparator.compare(accessor0, tIndex, accessor1, i);
+                    accessorBuild.reset(buffers.get(bIndex));
+                    int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
                     if (c == 0) {
-                        if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+                        matchFound = true;
+                        if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
                             flushFrame(outBuffer, writer);
                             appender.reset(outBuffer, true);
-                            if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+                            if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
                                 throw new IllegalStateException();
                             }
                         }
                     }
                 }
             }
+            if (!matchFound && isLeftOuter) {
+                if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+                    flushFrame(outBuffer, writer);
+                    appender.reset(outBuffer, true);
+                    if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
+                            .getSize())) {
+                        throw new IllegalStateException();
+                    }                  
+                }
+            }
         }
     }
 
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 74f0146..13af25d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -46,6 +48,8 @@
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final boolean isLeftOuter;
+    private final INullWriterFactory[] nullWriterFactories1;
     private final int tableSize;
 
     public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
@@ -56,8 +60,25 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
-        this.tableSize = tableSize;
         recordDescriptors[0] = recordDescriptor;
+        this.isLeftOuter = false;
+        this.nullWriterFactories1 = null;
+        this.tableSize = tableSize;
+    }
+
+    public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+            int tableSize) {
+        super(spec, 2, 1);
+        this.keys0 = keys0;
+        this.keys1 = keys1;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.comparatorFactories = comparatorFactories;
+        recordDescriptors[0] = recordDescriptor;
+        this.isLeftOuter = isLeftOuter;
+        this.nullWriterFactories1 = nullWriterFactories1;
+        this.tableSize = tableSize;
     }
 
     @Override
@@ -66,10 +87,11 @@
         HashProbeActivityNode hpa = new HashProbeActivityNode();
 
         builder.addTask(hba);
-        builder.addSourceEdge(0, hba, 0);
+        builder.addSourceEdge(1, hba, 0);
 
         builder.addTask(hpa);
-        builder.addSourceEdge(1, hpa, 0);
+        builder.addSourceEdge(0, hpa, 0);
+
         builder.addTargetEdge(0, hpa, 0);
 
         builder.addBlockingEdge(hba, hpa);
@@ -88,6 +110,13 @@
             for (int i = 0; i < comparatorFactories.length; ++i) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
+            final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+            if (isLeftOuter) {
+                for (int i = 0; i < nullWriterFactories1.length; i++) {
+                    nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+                }
+            }
+
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private InMemoryHashJoin joiner;
 
@@ -99,7 +128,7 @@
                             .createPartitioner();
                     joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
                             hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
-                                    keys0, keys1, comparators));
+                                    keys0, keys1, comparators), isLeftOuter, nullWriters1);
                 }
 
                 @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
new file mode 100644
index 0000000..5ee8fd5
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class NestedLoopJoin {
+    private final FrameTupleAccessor accessorInner;
+    private final FrameTupleAccessor accessorOuter;
+    private final FrameTupleAppender appender;
+    private final ITuplePairComparator tpComparator;
+    private final ByteBuffer outBuffer;
+    private final ByteBuffer innerBuffer;
+    private final List<ByteBuffer> outBuffers;
+    private final int memSize;
+    private final IHyracksStageletContext ctx;
+    private RunFileReader runFileReader;
+    private int currentMemSize = 0;
+    private final RunFileWriter runFileWriter;
+
+    public NestedLoopJoin(IHyracksStageletContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
+            ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+        this.accessorInner = accessor1;
+        this.accessorOuter = accessor0;
+        this.appender = new FrameTupleAppender(ctx.getFrameSize());
+        this.tpComparator = comparators;
+        this.outBuffer = ctx.allocateFrame();
+        this.innerBuffer = ctx.allocateFrame();
+        this.appender.reset(outBuffer, true);
+        this.outBuffers = new ArrayList<ByteBuffer>();
+        this.memSize = memSize;
+        this.ctx = ctx;
+
+        FileReference file = ctx.getJobletContext().createWorkspaceFile(
+                this.getClass().getSimpleName() + this.toString());
+        runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+        runFileWriter.open();
+    }
+
+    public void cache(ByteBuffer buffer) throws HyracksDataException {
+        runFileWriter.nextFrame(buffer);
+        System.out.println(runFileWriter.getFileSize());
+    }
+
+    public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
+        if (outBuffers.size() < memSize - 3) {
+            createAndCopyFrame(outerBuffer);
+            return;
+        }
+        if (currentMemSize < memSize - 3) {
+            reloadFrame(outerBuffer);
+            return;
+        }
+        for (ByteBuffer outBuffer : outBuffers) {
+            runFileReader = runFileWriter.createReader();
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                blockJoin(outBuffer, innerBuffer, writer);
+            }
+            runFileReader.close();
+        }
+        currentMemSize = 0;
+        reloadFrame(outerBuffer);
+    }
+
+    private void createAndCopyFrame(ByteBuffer outerBuffer) {
+        ByteBuffer outerBufferCopy = ctx.allocateFrame();
+        FrameUtils.copy(outerBuffer, outerBufferCopy);
+        outBuffers.add(outerBufferCopy);
+        currentMemSize++;
+    }
+
+    private void reloadFrame(ByteBuffer outerBuffer) {
+        outBuffers.get(currentMemSize).clear();
+        FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+        currentMemSize++;
+    }
+
+    private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
+            throws HyracksDataException {
+        accessorOuter.reset(outerBuffer);
+        accessorInner.reset(innerBuffer);
+        int tupleCount0 = accessorOuter.getTupleCount();
+        int tupleCount1 = accessorInner.getTupleCount();
+
+        for (int i = 0; i < tupleCount0; ++i) {
+            for (int j = 0; j < tupleCount1; ++j) {
+                int c = compare(accessorOuter, i, accessorInner, j);
+                if (c == 0) {
+                    if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+                        flushFrame(outBuffer, writer);
+                        appender.reset(outBuffer, true);
+                        if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void closeCache() throws HyracksDataException {
+        if (runFileWriter != null) {
+            runFileWriter.close();
+        }
+    }
+
+    public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+        for (ByteBuffer outBuffer : outBuffers) {
+            runFileReader = runFileWriter.createReader();
+            runFileReader.open();
+            while (runFileReader.nextFrame(innerBuffer)) {
+                blockJoin(outBuffer, innerBuffer, writer);
+            }
+            runFileReader.close();
+        }
+        outBuffers.clear();
+        currentMemSize = 0;
+
+        if (appender.getTupleCount() > 0) {
+            flushFrame(outBuffer, writer);
+        }
+    }
+
+    private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+        writer.nextFrame(buffer);
+        buffer.position(0);
+        buffer.limit(buffer.capacity());
+    }
+
+    private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
+            throws HyracksDataException {
+        int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
+        if (c != 0) {
+            return c;
+        }
+        return 0;
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
new file mode 100644
index 0000000..1436e30
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final String JOINER = "joiner";
+
+    private static final long serialVersionUID = 1L;
+    private final ITuplePairComparatorFactory comparatorFactory;
+    private final int memSize;
+
+    public NestedLoopJoinOperatorDescriptor(JobSpecification spec, ITuplePairComparatorFactory comparatorFactory,
+            RecordDescriptor recordDescriptor, int memSize) {
+        super(spec, 2, 1);
+        this.comparatorFactory = comparatorFactory;
+        this.recordDescriptors[0] = recordDescriptor;
+        this.memSize = memSize;
+    }
+
+    @Override
+    public void contributeTaskGraph(IActivityGraphBuilder builder) {
+        JoinCacheActivityNode jc = new JoinCacheActivityNode();
+        NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode();
+
+        builder.addTask(jc);
+        builder.addSourceEdge(1, jc, 0);
+
+        builder.addTask(nlj);
+        builder.addSourceEdge(0, nlj, 0);
+
+        builder.addTargetEdge(0, nlj, 0);
+        builder.addBlockingEdge(jc, nlj);
+    }
+
+    private class JoinCacheActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) {
+            final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+            final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
+
+            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+                private NestedLoopJoin joiner;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    ByteBuffer copyBuffer = ctx.allocateFrame();
+                    FrameUtils.copy(buffer, copyBuffer);
+                    copyBuffer.flip();
+                    joiner.cache(copyBuffer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    joiner.closeCache();
+                    env.set(JOINER, joiner);
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                }
+            };
+            return op;
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return NestedLoopJoinOperatorDescriptor.this;
+        }
+    }
+
+    private class NestedLoopJoinActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+                final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+                int nPartitions) {
+
+            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+                private NestedLoopJoin joiner;
+
+                @Override
+                public void open() throws HyracksDataException {
+                    joiner = (NestedLoopJoin) env.get(JOINER);
+                    writer.open();
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    joiner.join(buffer, writer);
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    joiner.closeJoin(writer);
+                    writer.close();
+                    env.set(JOINER, null);
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    writer.flush();
+                }
+            };
+            return op;
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return NestedLoopJoinOperatorDescriptor.this;
+        }
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..f82a7ef
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public SplitOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int outputArity) {
+        super(spec, 1, outputArity);
+        for (int i = 0; i < outputArity; i++) {
+            recordDescriptors[i] = rDesc;
+        }
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+            throws HyracksDataException {
+        return new AbstractUnaryInputOperatorNodePushable() {
+            private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+
+            @Override
+            public void close() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.close();
+                }
+            }
+
+            @Override
+            public void flush() throws HyracksDataException {
+            }
+
+            @Override
+            public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    FrameUtils.flushFrame(bufferAccessor, writer);
+                }
+            }
+
+            @Override
+            public void open() throws HyracksDataException {
+                for (IFrameWriter writer : writers) {
+                    writer.open();
+                }
+            }
+
+            @Override
+            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                writers[index] = writer;
+            }
+        };
+    }
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
new file mode 100644
index 0000000..eccb945
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.union;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
+    public UnionAllOperatorDescriptor(JobSpecification spec, int nInputs, RecordDescriptor recordDescriptor) {
+        super(spec, nInputs, 1);
+        recordDescriptors[0] = recordDescriptor;
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void contributeTaskGraph(IActivityGraphBuilder builder) {
+        UnionActivityNode uba = new UnionActivityNode();
+        builder.addTask(uba);
+        for (int i = 0; i < inputArity; ++i) {
+            builder.addSourceEdge(i, uba, i);
+        }
+        builder.addTargetEdge(0, uba, 0);
+    }
+
+    private class UnionActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public UnionActivityNode() {
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+                throws HyracksDataException {
+            RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+            return new UnionOperator(ctx, inRecordDesc);
+        }
+
+        @Override
+        public IOperatorDescriptor getOwner() {
+            return UnionAllOperatorDescriptor.this;
+        }
+
+    }
+
+    private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
+        private int nOpened;
+
+        private int nClosed;
+
+        public UnionOperator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc) {
+            nOpened = 0;
+            nClosed = 0;
+        }
+
+        @Override
+        public int getInputArity() {
+            return inputArity;
+        }
+
+        @Override
+        public IFrameWriter getInputFrameWriter(int index) {
+            return new IFrameWriter() {
+                @Override
+                public void open() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        if (++nOpened == 1) {
+                            writer.open();
+                        }
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.nextFrame(buffer);
+                    }
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        writer.flush();
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    synchronized (UnionOperator.this) {
+                        if (++nClosed == inputArity) {
+                            writer.close();
+                        }
+                    }
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
new file mode 100644
index 0000000..2b32142
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/SplitOperatorTest.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+
+public class SplitOperatorTest extends AbstractIntegrationTest {
+
+    public void compareFiles(String fileNameA, String fileNameB) throws IOException {
+        BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
+        BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
+
+        String lineA, lineB;
+        while ((lineA = fileA.readLine()) != null) {
+            lineB = fileB.readLine();
+            Assert.assertEquals(lineA, lineB);
+        }
+        Assert.assertNull(fileB.readLine());
+    }
+
+    @Test
+    public void test() throws Exception {
+        final int outputArity = 2;
+
+        JobSpecification spec = new JobSpecification();
+
+        String inputFileName = "data/words.txt";
+        File[] outputFile = new File[outputArity];
+        for (int i = 0; i < outputArity; i++) {
+            outputFile[i] = File.createTempFile("splitop", null);
+            outputFile[i].deleteOnExit();
+        }
+
+        FileSplit[] inputSplits = new FileSplit[] { new FileSplit(NC1_ID, inputFileName) };
+
+        String[] locations = new String[] { NC1_ID };
+
+        DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
+                new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
+        RecordDescriptor stringRec = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE, });
+
+        FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider(
+                inputSplits), stringParser, stringRec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations);
+
+        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, locations);
+
+        IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+        for (int i = 0; i < outputArity; i++) {
+            outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { new FileSplit(NC1_ID,
+                    outputFile[i].getAbsolutePath()) });
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
+        }
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+        for (int i = 0; i < outputArity; i++) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
+        }
+
+        for (int i = 0; i < outputArity; i++) {
+            spec.addRoot(outputOp[i]);
+        }
+        runTest(spec);
+
+        for (int i = 0; i < outputArity; i++) {
+            compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 4958a35..db5985d 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -14,7 +14,9 @@
  */
 package edu.uci.ics.hyracks.tests.integration;
 
+import java.io.DataOutput;
 import java.io.File;
+import java.io.IOException;
 
 import org.junit.Test;
 
@@ -23,8 +25,11 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -51,6 +56,29 @@
 public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
     private static final boolean DEBUG = true;
 
+    static private class NoopNullWriterFactory implements INullWriterFactory {
+
+        private static final long serialVersionUID = 1L;
+        public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
+
+        private NoopNullWriterFactory() {
+        }
+
+        @Override
+        public INullWriter createNullWriter() {
+            return new INullWriter() {
+                @Override
+                public void writeNull(DataOutput out) throws HyracksDataException {
+                    try {
+                        out.writeShort(0);
+                    } catch (IOException e) {
+                        throw new HyracksDataException(e);
+                    }
+                }
+            };
+        }
+    }
+
     /*
      * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
      */
@@ -274,6 +302,251 @@
     }
 
     @Test
+    public void customerOrderCIDInMemoryHashLeftOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
+                new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+                nullWriterFactories, 128);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+        //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+        // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDGraceHashLeftOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+                new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+                nullWriterFactories);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+        //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+        // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDHybridHashLeftOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+        for (int j = 0; j < nullWriterFactories.length; j++) {
+            nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+        }
+
+        HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+                new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+                new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+                nullWriterFactories);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+        //     "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+        // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
     public void customerOrderCIDJoinMulti() throws Exception {
         JobSpecification spec = new JobSpecification();
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
new file mode 100644
index 0000000..cab54f4
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderNestedLoopJoinTest.java
@@ -0,0 +1,335 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNReplicatingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+
+public class TPCHCustomerOrderNestedLoopJoinTest extends AbstractIntegrationTest {
+    private static final boolean DEBUG = true;
+
+    static class JoinComparatorFactory implements ITuplePairComparatorFactory {
+        private static final long serialVersionUID = 1L;
+
+        private final IBinaryComparatorFactory bFactory;
+        private final int pos0;
+        private final int pos1;
+
+        public JoinComparatorFactory(IBinaryComparatorFactory bFactory, int pos0, int pos1) {
+            this.bFactory = bFactory;
+            this.pos0 = pos0;
+            this.pos1 = pos1;
+        }
+
+        @Override
+        public ITuplePairComparator createTuplePairComparator() {
+            return new JoinComparator(bFactory.createBinaryComparator(), pos0, pos1);
+        }
+    }
+
+    static class JoinComparator implements ITuplePairComparator {
+
+        private final IBinaryComparator bComparator;
+        private final int field0;
+        private final int field1;
+
+        public JoinComparator(IBinaryComparator bComparator, int field0, int field1) {
+            this.bComparator = bComparator;
+            this.field0 = field0;
+            this.field1 = field1;
+        }
+
+        @Override
+        public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+            int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+            int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+            int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+            int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+            int fStart0 = accessor0.getFieldStartOffset(tIndex0, field0);
+            int fEnd0 = accessor0.getFieldEndOffset(tIndex0, field0);
+            int fLen0 = fEnd0 - fStart0;
+
+            int fStart1 = accessor1.getFieldStartOffset(tIndex1, field1);
+            int fEnd1 = accessor1.getFieldEndOffset(tIndex1, field1);
+            int fLen1 = fEnd1 - fStart1;
+
+            int c = bComparator.compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
+                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            if (c != 0) {
+                return c;
+            }
+            return 0;
+        }
+    }
+
+    /*
+     * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
+     * C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY
+     * INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT
+     * NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );
+     * TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,
+     * O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE
+     * DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY
+     * CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT
+     * NULL, O_COMMENT VARCHAR(79) NOT NULL );
+     */
+    @Test
+    public void customerOrderCIDJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/customer.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/tpch0.001/orders.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 4);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDJoinMulti() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 5);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    @Test
+    public void customerOrderCIDJoinAutoExpand() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        NestedLoopJoinOperatorDescriptor join = new NestedLoopJoinOperatorDescriptor(spec, new JoinComparatorFactory(
+                UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 6);
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
+
+        IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+                : new NullSinkOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+        IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
new file mode 100644
index 0000000..b1089fe
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/UnionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.PrinterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
+
+public class UnionTest extends AbstractIntegrationTest {
+    @Test
+    public void union01() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+                new FileSplit(NC2_ID, new FileReference(new File("data/words.txt"))),
+                new FileSplit(NC1_ID, new FileReference(new File("data/words.txt"))) });
+
+        RecordDescriptor desc = new RecordDescriptor(
+                new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor csvScanner01 = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner01, NC2_ID, NC1_ID);
+
+        FileScanOperatorDescriptor csvScanner02 = new FileScanOperatorDescriptor(
+                spec,
+                splitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, ','),
+                desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner02, NC2_ID, NC1_ID);
+
+        UnionAllOperatorDescriptor unionAll = new UnionAllOperatorDescriptor(spec, 2, desc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, unionAll, NC2_ID, NC1_ID);
+
+        PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC2_ID, NC1_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner01, 0, unionAll, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), csvScanner02, 0, unionAll, 1);
+        spec.connect(new OneToOneConnectorDescriptor(spec), unionAll, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
index 40cb7da..0d5dc8f 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
@@ -32,6 +32,8 @@
 
     public int getTokenLength();
 
+    public int getNumTokens();
+
     public void writeToken(DataOutput dos) throws IOException;
 
     public RecordDescriptor getTokenSchema();
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
index 73635f9..425436b 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
@@ -30,6 +30,7 @@
             new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
 
     private final char delimiter;
+    private final byte typeTag;
     private byte[] data;
     private int start;
     private int length;
@@ -38,8 +39,14 @@
     private int tokenStart;
     private int pos;
 
+    public DelimitedUTF8StringBinaryTokenizer(char delimiter, byte typeTag) {
+        this.delimiter = delimiter;
+        this.typeTag = typeTag;
+    }
+
     public DelimitedUTF8StringBinaryTokenizer(char delimiter) {
         this.delimiter = delimiter;
+        this.typeTag = -1;
     }
 
     @Override
@@ -88,6 +95,9 @@
 
     @Override
     public void writeToken(DataOutput dos) throws IOException {
+        if (typeTag > 0)
+            dos.write(typeTag);
+
         // WARNING: 2-byte length indicator is specific to UTF-8
         dos.writeShort((short) tokenLength);
         dos.write(data, tokenStart, tokenLength);
@@ -97,4 +107,10 @@
     public RecordDescriptor getTokenSchema() {
         return tokenSchema;
     }
+
+    // cannot be implemented for this tokenizer
+    @Override
+    public int getNumTokens() {
+        return -1;
+    }
 }
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
index e3e0be3..2e85db5 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
@@ -22,13 +22,20 @@
 
     private static final long serialVersionUID = 1L;
     private final char delimiter;
+    private final byte typeTag;
+
+    public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter, byte typeTag) {
+        this.delimiter = delimiter;
+        this.typeTag = typeTag;
+    }
 
     public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter) {
         this.delimiter = delimiter;
+        this.typeTag = -1;
     }
 
     @Override
     public IBinaryTokenizer createBinaryTokenizer() {
-        return new DelimitedUTF8StringBinaryTokenizer(delimiter);
+        return new DelimitedUTF8StringBinaryTokenizer(delimiter, typeTag);
     }
 }
diff --git a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
index 54fc371..2cc0b6c 100644
--- a/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
+++ b/hyracks/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
@@ -142,4 +142,9 @@
     public RecordDescriptor getTokenSchema() {
         return tokenSchema;
     }
-}
+
+    @Override
+    public int getNumTokens() {
+        return 0;
+    }
+}
\ No newline at end of file