Adding IPredicateEvaluator for issue 423 at hyracks level

 Author:    pouria.pirzadeh@gmail.com <pouria.pirzadeh@gmail.com@123451ca-8445-de46-9d55-352943316053>
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index dde4443..e0ee1f0 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -37,6 +37,7 @@
 import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 
 public abstract class AbstractCompilerFactoryBuilder {
 
@@ -50,6 +51,7 @@
     protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
     protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
     protected IPrinterFactoryProvider printerProvider;
+    protected IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
     protected IExpressionRuntimeProvider expressionRuntimeProvider;
     protected IExpressionTypeComputer expressionTypeComputer;
     protected INullableTypeComputer nullableTypeComputer;
@@ -111,6 +113,14 @@
     public IBinaryComparatorFactoryProvider getComparatorFactoryProvider() {
         return comparatorFactoryProvider;
     }
+    
+    public void setPredicateEvaluatorFactoryProvider(IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider) {
+        this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
+    }
+
+    public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactory() {
+        return predEvaluatorFactoryProvider;
+    }
 
     public void setBinaryBooleanInspectorFactory(IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
         this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 2b24bd0..1c62307 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -91,7 +91,7 @@
                                 binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
                                 nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
                                 expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
-                                partialAggregationTypeComputer, frameSize, clusterLocations);
+                                partialAggregationTypeComputer, predEvaluatorFactoryProvider, frameSize, clusterLocations);
                         PlanCompiler pc = new PlanCompiler(context);
                         return pc.compilePlan(plan, null, jobEventListenerFactory);
                     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 6da42b4..32089e2 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -40,6 +40,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -106,6 +108,10 @@
             Object t = env.getVarType(v);
             comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
         }
+        
+        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+        IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
+        
         RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
                 propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -125,7 +131,7 @@
                     case INNER: {
                         opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                                hashFunFactories, comparatorFactories, recDescriptor);
+                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory);
                         break;
                     }
                     case LEFT_OUTER: {
@@ -135,7 +141,7 @@
                         }
                         opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
-                                hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
+                                hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true, nullWriterFactories);
                         break;
                     }
                     default: {
@@ -153,7 +159,7 @@
                                 maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
                                 comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
                                         keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysRight, keysLeft));
+                                        keysRight, keysLeft), predEvaluatorFactory);
                         break;
                     }
                     case LEFT_OUTER: {
@@ -165,7 +171,7 @@
                                 maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
                                 comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
                                         keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysRight, keysLeft), true, nullWriterFactories);
+                                        keysRight, keysLeft), predEvaluatorFactory, true, nullWriterFactories);
                         break;
                     }
                     default: {
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index b09c194..a737f90 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -37,6 +37,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
 import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
@@ -86,6 +88,10 @@
             Object t = env.getVarType(v);
             comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
         }
+        
+        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+        IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
+        
         RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
                 propagatedSchema, context);
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -94,7 +100,7 @@
         switch (kind) {
             case INNER: {
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, tableSize);
+                        comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory);
                 break;
             }
             case LEFT_OUTER: {
@@ -103,7 +109,7 @@
                     nullWriterFactories[j] = context.getNullWriterFactory();
                 }
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
-                        comparatorFactories, recDescriptor, true, nullWriterFactories, tableSize);
+                        comparatorFactories, predEvaluatorFactory, recDescriptor, true, nullWriterFactories, tableSize);
                 break;
             }
             default: {
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 365d1a5..245e5e1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -41,6 +41,8 @@
 import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 
 public class JobGenContext {
 	private final IOperatorSchema outerFlowSchema;
@@ -61,6 +63,7 @@
 	private final IExpressionTypeComputer expressionTypeComputer;
 	private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
 	private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+	private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
 	private final int frameSize;
 	private AlgebricksPartitionConstraint clusterLocations;
 	private int varCounter;
@@ -86,7 +89,7 @@
 			ITypingContext typingContext,
 			IExpressionEvalSizeComputer expressionEvalSizeComputer,
 			IPartialAggregationTypeComputer partialAggregationTypeComputer,
-			int frameSize, AlgebricksPartitionConstraint clusterLocations) {
+			IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize, AlgebricksPartitionConstraint clusterLocations) {
 		this.outerFlowSchema = outerFlowSchema;
 		this.metadataProvider = metadataProvider;
 		this.appContext = appContext;
@@ -106,6 +109,7 @@
 		this.typingContext = typingContext;
 		this.expressionEvalSizeComputer = expressionEvalSizeComputer;
 		this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+		this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
 		this.frameSize = frameSize;
 		this.varCounter = 0;
 	}
@@ -157,6 +161,10 @@
 	public IPrinterFactoryProvider getPrinterFactoryProvider() {
 		return printerFactoryProvider;
 	}
+	
+	public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider(){
+		return predEvaluatorFactoryProvider;
+	}
 
 	public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
 		return expressionRuntimeProvider;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java
new file mode 100644
index 0000000..5f022ca
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+
+public interface IPredicateEvaluator {
+	public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1, int tupId1);
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
new file mode 100644
index 0000000..6bf82da
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IPredicateEvaluatorFactory extends Serializable {
+	public IPredicateEvaluator createPredicateEvaluator();
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
new file mode 100644
index 0000000..5646a34
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IPredicateEvaluatorFactoryProvider extends Serializable{
+	public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys0, int[] keys1);
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 6c58d6f..0bb514e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -22,6 +22,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -42,13 +44,14 @@
     private final double factor;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
 
     public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) {
+            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory) {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -58,6 +61,7 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = false;
         this.nullWriterFactories1 = null;
         recordDescriptors[0] = recordDescriptor;
@@ -66,7 +70,7 @@
     public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1, IPredicateEvaluatorFactory predEvalFactory) {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -76,6 +80,7 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
@@ -143,12 +148,13 @@
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(rpartAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(spartAid, 0);
             int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator() );
+            
             return new GraceHashJoinOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
                     RPARTITION_ACTIVITY_ID), partition), new TaskId(new ActivityId(getOperatorId(),
                     SPARTITION_ACTIVITY_ID), partition), recordsPerFrame, factor, keys0, keys1, hashFunctionFactories,
                     comparatorFactories, nullWriterFactories1, rd1, rd0, recordDescriptors[0], numPartitions,
-                    isLeftOuter);
+                    predEvaluator, isLeftOuter);
         }
     }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index 91f509d..7c9bd88 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -51,12 +52,13 @@
     private final double factor;
     private final int numPartitions;
     private final boolean isLeftOuter;
+    private final IPredicateEvaluator predEvaluator;
 
     GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object state0Id, Object state1Id, int recordsPerFrame,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
             IBinaryComparatorFactory[] comparatorFactories, INullWriterFactory[] nullWriterFactories,
             RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor outRecordDescriptor, int numPartitions,
-            boolean isLeftOuter) {
+            IPredicateEvaluator predEval, boolean isLeftOuter) {
         this.ctx = ctx;
         this.state0Id = state0Id;
         this.state1Id = state1Id;
@@ -70,6 +72,7 @@
         this.numPartitions = numPartitions;
         this.recordsPerFrame = recordsPerFrame;
         this.factor = factor;
+        this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
     }
 
@@ -114,7 +117,7 @@
                 table.reset();
                 InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                         ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
-                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
+                        new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table, predEvaluator);
 
                 // build
                 if (buildWriter != null) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 4f9b987..ea9414b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -29,6 +29,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -66,6 +68,7 @@
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
 
@@ -87,7 +90,7 @@
     public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor) throws HyracksDataException {
+            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory) throws HyracksDataException {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.inputsize0 = inputsize0;
@@ -97,6 +100,7 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = false;
         this.nullWriterFactories1 = null;
         recordDescriptors[0] = recordDescriptor;
@@ -105,7 +109,7 @@
     public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             int recordsPerFrame, double factor, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
+            RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
             throws HyracksDataException {
         super(spec, 2, 1);
         this.memsize = memsize;
@@ -116,6 +120,7 @@
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
         recordDescriptors[0] = recordDescriptor;
@@ -189,6 +194,7 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
@@ -315,7 +321,7 @@
                     state.joiner = new InMemoryHashJoin(ctx, tableSize,
                             new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
-                                    comparators), isLeftOuter, nullWriters1, table);
+                                    comparators), isLeftOuter, nullWriters1, table, predEvaluator);
                     bufferForPartitions = new ByteBuffer[state.nPartitions];
                     state.fWriters = new RunFileWriter[state.nPartitions];
                     for (int i = 0; i < state.nPartitions; i++) {
@@ -377,6 +383,7 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private BuildAndPartitionTaskState state;
@@ -500,7 +507,7 @@
                             InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
                                     hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
-                                    nullWriters1, table);
+                                    nullWriters1, table, predEvaluator);
 
                             if (buildWriter != null) {
                                 RunFileReader buildReader = buildWriter.createReader();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index ebf3848..2d49190 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -47,18 +48,19 @@
     private final int tableSize;
     private final TuplePointer storedTuplePointer;
     private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
+    private final IPredicateEvaluator predEvaluator;
 
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
-            ISerializableTable table) throws HyracksDataException {
-        this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, false);
+            ISerializableTable table, IPredicateEvaluator predEval) throws HyracksDataException {
+        this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, predEval, false);
     }
 
     public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
             ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
             FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
-            ISerializableTable table, boolean reverse) throws HyracksDataException {
+            ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
         this.tableSize = tableSize;
         this.table = table;
         storedTuplePointer = new TuplePointer();
@@ -71,6 +73,7 @@
         tpComparator = comparator;
         outBuffer = ctx.allocateFrame();
         appender.reset(outBuffer, true);
+        predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
             int fieldCountOuter = accessor1.getFieldCount();
@@ -121,7 +124,6 @@
                     }
                 } while (true);
         	}
-            
             if (!matchFound && isLeftOuter) {
                 if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
                         nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
@@ -133,7 +135,6 @@
                                 + appender.getBuffer().capacity() + ")");
                     }
                 }
-
             }
         }
     }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index e0a5613..705923a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -29,6 +29,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -53,18 +55,20 @@
     private final int[] keys1;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories;
     private final IBinaryComparatorFactory[] comparatorFactories;
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
     private final int tableSize;
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, int tableSize) {
+            RecordDescriptor recordDescriptor, int tableSize, IPredicateEvaluatorFactory predEvalFactory) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = false;
         this.nullWriterFactories1 = null;
@@ -73,18 +77,34 @@
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+            IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
             int tableSize) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
         this.hashFunctionFactories = hashFunctionFactories;
         this.comparatorFactories = comparatorFactories;
+        this.predEvaluatorFactory = predEvalFactory;
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
         this.tableSize = tableSize;
     }
+    
+    public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, int tableSize) {
+    	this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, recordDescriptor, tableSize, null);
+    }
+    
+    public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
+            IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+            RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+            int tableSize) {
+    	this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories,null,recordDescriptor,isLeftOuter,nullWriterFactories1,tableSize);
+    }
+    
+    
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
@@ -150,6 +170,7 @@
                     nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
                 }
             }
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private HashBuildTaskState state;
@@ -166,7 +187,7 @@
                     state.joiner = new InMemoryHashJoin(ctx, tableSize,
                             new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
                                     ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
-                                    comparators), isLeftOuter, nullWriters1, table);
+                                    comparators), isLeftOuter, nullWriters1, table, predEvaluator);
                 }
 
                 @Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 546e80f..9496b76 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -47,9 +48,10 @@
     private final RunFileWriter runFileWriter;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder nullTupleBuilder;
-
+    private final IPredicateEvaluator predEvaluator;
+    
     public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
-            ITuplePairComparator comparators, int memSize, boolean isLeftOuter, INullWriter[] nullWriters1)
+            ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, INullWriter[] nullWriters1)
             throws HyracksDataException {
         this.accessorInner = accessor1;
         this.accessorOuter = accessor0;
@@ -60,6 +62,7 @@
         this.appender.reset(outBuffer, true);
         this.outBuffers = new ArrayList<ByteBuffer>();
         this.memSize = memSize;
+        this.predEvaluator = predEval;
         this.ctx = ctx;
 
         this.isLeftOuter = isLeftOuter;
@@ -130,8 +133,9 @@
             boolean matchFound = false;
             for (int j = 0; j < tupleCount1; ++j) {
                 int c = compare(accessorOuter, i, accessorInner, j);
-                if (c == 0) {
-                    matchFound = true;
+                boolean prdEval = (predEvaluator == null) || (predEvaluator.evaluate(accessorOuter, i, accessorInner, j));
+                if (c == 0 && prdEval) {
+                	matchFound = true;
                     if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
                         flushFrame(outBuffer, writer);
                         appender.reset(outBuffer, true);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 0be01c1..d3f664e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -27,6 +27,8 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -49,16 +51,24 @@
     private static final long serialVersionUID = 1L;
     private final ITuplePairComparatorFactory comparatorFactory;
     private final int memSize;
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
-
+    
     public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
             ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
             boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+        this(spec, comparatorFactory, recordDescriptor, memSize, null, isLeftOuter, nullWriterFactories1);
+    }
+    
+    public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
+            IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
         super(spec, 2, 1);
         this.comparatorFactory = comparatorFactory;
         this.recordDescriptors[0] = recordDescriptor;
         this.memSize = memSize;
+        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
     }
@@ -117,7 +127,8 @@
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-
+            final IPredicateEvaluator predEvaluator = ( (predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null);
+            
             final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
             if (isLeftOuter) {
                 for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -134,7 +145,7 @@
                             partition));
 
                     state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
-                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, isLeftOuter,
+                            new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, predEvaluator, isLeftOuter,
                             nullWriters1);
 
                 }
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c81bf54..993ad5a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -9,6 +9,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -53,7 +54,8 @@
 
     private RunFileWriter[] buildRFWriters; //writing spilled build partitions
     private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
+    
+    private final IPredicateEvaluator predEvaluator;
     private final boolean isLeftOuter;
     private final INullWriter[] nullWriters1;
 
@@ -78,15 +80,15 @@
     private ByteBuffer[] sPartBuffs;    //Buffers for probe spilled partitions (one buffer per spilled partition)
     private ByteBuffer probeResBuff;    //Buffer for probe resident partition tuples
     private ByteBuffer reloadBuffer;    //Buffer for reloading spilled partitions during partition tuning 
-
+    
     private int[] buildPSizeInFrames; //Used for partition tuning
     private int freeFramesCounter; //Used for partition tuning
     
     private boolean isTableEmpty;	//Added for handling the case, where build side is empty (tableSize is 0)
-
+    
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
             String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
-            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc) {
+            RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval) {
         this.ctx = ctx;
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
@@ -106,6 +108,7 @@
         this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
         this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
 
+        this.predEvaluator = predEval;
         this.isLeftOuter = false;
         this.nullWriters1 = null;
 
@@ -114,7 +117,7 @@
     public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
             String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
             RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
-            boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+            IPredicateEvaluator predEval, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
         this.ctx = ctx;
         this.memForJoin = memForJoin;
         this.buildRd = buildRd;
@@ -133,7 +136,8 @@
 
         this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
         this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
-
+        
+        this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
 
         this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
@@ -423,7 +427,7 @@
         this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
                 new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
                         ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
-                        comparators), isLeftOuter, nullWriters1, table);
+                        comparators), isLeftOuter, nullWriters1, table, predEvaluator);
     }
 
     private void cacheInMemJoin() throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 7067080..01c331a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -30,6 +30,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
 import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -111,7 +113,8 @@
     private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ
     private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
     private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
-
+    private final IPredicateEvaluatorFactory predEvaluatorFactory;
+    
     private final boolean isLeftOuter;
     private final INullWriterFactory[] nullWriterFactories1;
 
@@ -119,7 +122,7 @@
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
             ITuplePairComparatorFactory tupPaircomparatorFactory0,
-            ITuplePairComparatorFactory tupPaircomparatorFactory1, boolean isLeftOuter,
+            ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory, boolean isLeftOuter,
             INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
 
         super(spec, 2, 1);
@@ -133,6 +136,7 @@
         this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
         this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
         recordDescriptors[0] = recordDescriptor;
+        this.predEvaluatorFactory = predEvaluatorFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
         
@@ -142,7 +146,7 @@
     public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
             double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
             IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
-            ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1)
+            ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
             throws HyracksDataException {
 
         super(spec, 2, 1);
@@ -155,6 +159,7 @@
         this.comparatorFactories = comparatorFactories;
         this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
         this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
+        this.predEvaluatorFactory = predEvaluatorFactory;
         recordDescriptors[0] = recordDescriptor;
         this.isLeftOuter = false;
         this.nullWriterFactories1 = null;
@@ -255,6 +260,8 @@
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
             
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+            
             
             IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
@@ -276,12 +283,12 @@
                     if(!isLeftOuter){
                     	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
                                 PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
-                                buildHpc);
+                                buildHpc, predEvaluator);
                     }
                     else{
                     	state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
                                 PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
-                                buildHpc, isLeftOuter, nullWriterFactories1);
+                                buildHpc, predEvaluator, isLeftOuter, nullWriterFactories1);
                     }
                     
                     state.hybridHJ.initBuild();
@@ -335,7 +342,8 @@
             final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
             final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
             final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
-
+            final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+            
             for (int i = 0; i < comparatorFactories.length; i++) {
                 comparators[i] = comparatorFactories[i].createBinaryComparator();
             }
@@ -456,7 +464,7 @@
                                     nPartitions);
                            
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
-                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
+                                    probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator);
 
                             buildSideReader.open();
                             rHHj.initBuild();
@@ -518,7 +526,7 @@
                                     nPartitions);
                             
                             rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
-                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
+                                    buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
 
                             probeSideReader.open();
                             rHHj.initBuild();
@@ -586,7 +594,7 @@
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
                             ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
                             buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
-                            isLeftOuter, nullWriters1, table, reverse);
+                            isLeftOuter, nullWriters1, table, predEvaluator, reverse);
 
                     bReader.open();
                     rPartbuff.clear();
@@ -614,7 +622,7 @@
                         throws HyracksDataException {
 
                     NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
-                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, false, null);
+                            new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);
 
                     ByteBuffer cacheBuff = ctx.allocateFrame();
                     innerReader.open();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
index 0999ee8..b7fd241 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
@@ -90,7 +90,7 @@
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc, new JoinComparatorFactory(
                         PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0));
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
@@ -168,7 +168,7 @@
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc, new JoinComparatorFactory(
                         PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0));
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
@@ -247,7 +247,7 @@
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc, new JoinComparatorFactory(
                         PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
-                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0));
+                new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index b5eb850..9b35867 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -122,7 +122,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128);
+                custOrderJoinDesc, 128, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -205,7 +205,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc);
+                custOrderJoinDesc, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -288,7 +288,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc);
+                custOrderJoinDesc, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -372,7 +372,7 @@
                 new int[] { 1 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, true, nullWriterFactories, 128);
+                null, custOrderJoinDesc, true, nullWriterFactories, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -460,7 +460,7 @@
                 new int[] { 1 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, true, nullWriterFactories);
+                custOrderJoinDesc, true, nullWriterFactories, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -548,7 +548,7 @@
                 new int[] { 1 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, true, nullWriterFactories);
+                custOrderJoinDesc, null, true, nullWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -629,7 +629,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128);
+                custOrderJoinDesc, 128, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -720,7 +720,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc);
+                custOrderJoinDesc, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -811,7 +811,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc);
+                custOrderJoinDesc, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -898,7 +898,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128);
+                custOrderJoinDesc, 128, null);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -991,7 +991,7 @@
                 new int[] { 0 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, 128);
+                custOrderJoinDesc, 128, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 6df9ff8..8dab4de 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -210,7 +210,7 @@
                     new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                             .of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    custOrderJoinDesc);
+                    custOrderJoinDesc, null);
 
         } else if ("hybridhash".equalsIgnoreCase(algo)) {
             join = new HybridHashJoinOperatorDescriptor(
@@ -224,7 +224,7 @@
                     new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                             .of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    custOrderJoinDesc);
+                    custOrderJoinDesc, null);
 
         } else {
             join = new InMemoryHashJoinOperatorDescriptor(
@@ -234,7 +234,7 @@
                     new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
                             .of(UTF8StringPointable.FACTORY) },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                    custOrderJoinDesc, 6000000);
+                    custOrderJoinDesc, 6000000, null);
         }
 
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);