[ASTERIXDB-3004][RT] Improve hash join performance

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Improve hash join performance when joined values are NULL/MISSING
- Add SqlppHashJoinRQJTest to test different hash join scenarios

Change-Id: I8f0afb05908e8281f2865775e074d459964fe989
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14784
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 2328bf4..ccdf620 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -73,11 +73,10 @@
         }
         integrationUtil.init(cleanup, configFile);
 
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("initializing HDFS");
-        }
-
         if (startHdfs) {
+            if (LOGGER.isInfoEnabled()) {
+                LOGGER.info("initializing HDFS");
+            }
             HDFSCluster.getInstance().setup();
         }
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHashJoinRQJTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHashJoinRQJTest.java
new file mode 100644
index 0000000..40ee38e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHashJoinRQJTest.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.nio.IntBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.asterix.api.http.server.QueryServiceRequestParameters;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.lang.sqlpp.parser.SqlppHint;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.ParameterTypeEnum;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.math3.random.MersenneTwister;
+import org.apache.commons.math3.random.RandomGenerator;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * RQG testsuite for hash joins.
+ * Tests:
+ * <ul>
+ * <li> Fields with / without NULL or MISSING values
+ * <li> Inner / Left Outer joins</li>
+ * <li> Repartitioning / Broadcast joins </li>
+ * </ul>
+ */
+@RunWith(Parameterized.class)
+public class SqlppHashJoinRQJTest {
+
+    static final Logger LOGGER = LogManager.getLogger(SqlppHashJoinRQJTest.class);
+
+    static final String CONF_PROPERTY_SEED =
+            SqlppRQGTestBase.getConfigurationPropertyName(SqlppHashJoinRQJTest.class, "seed");
+    static final long CONF_PROPERTY_SEED_DEFAULT = System.currentTimeMillis();
+
+    static final String CONF_PROPERTY_LIMIT =
+            SqlppRQGTestBase.getConfigurationPropertyName(SqlppHashJoinRQJTest.class, "limit");
+    static final int CONF_PROPERTY_LIMIT_DEFAULT = 40;
+
+    static final String CONF_PROPERTY_OFFSET =
+            SqlppRQGTestBase.getConfigurationPropertyName(SqlppHashJoinRQJTest.class, "offset");
+    static final int CONF_PROPERTY_OFFSET_DEFAULT = 0;
+
+    static final Path OUTPUT_DIR = Paths.get("target", SqlppHashJoinRQJTest.class.getSimpleName());
+
+    static final String DATAVERSE_NAME = "dvTest";
+    static final String[] DATASET_NAMES = new String[] { "ds1", "ds2" };
+    static final String ID_COLUMN_NAME = "id";
+    static final String BASE_COLUMN_NAME = "i";
+    static final List<Integer> DATASET_ROWS = Arrays.asList(20000, 40000);
+    static final List<Integer> DATASET_COLUMNS = Arrays.asList(4, 10, 100, 1000, 10000);
+    static final int DATASET_COLUMN_LENGTH_MIN =
+            String.valueOf(DATASET_COLUMNS.stream().mapToInt(Integer::intValue).max().orElse(0)).length();
+    static final int DATASET_COLUMN_LENGTH_MAX = Math.max(20, DATASET_COLUMN_LENGTH_MIN);
+    static final int NULLABLE_COLUMN_RATIO = 2;
+    static final int OUTER_JOIN_RATIO = 3;
+    static final int BROADCAST_RATIO = 4;
+
+    static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    static final ObjectReader OBJECT_READER = OBJECT_MAPPER.readerFor(ObjectNode.class);
+
+    static long datasetRowCount;
+    static int datasetColumnLength;
+    static TestExecutor testExecutor;
+
+    final TestInstance testInstance;
+
+    public SqlppHashJoinRQJTest(TestInstance testInstance) {
+        this.testInstance = testInstance;
+    }
+
+    @Parameterized.Parameters(name = "SqlppHashJoinRQJTest {index}: {0}")
+    public static Collection<TestInstance> tests() {
+        long seed = SqlppRQGTestBase.getLongConfigurationProperty(CONF_PROPERTY_SEED, CONF_PROPERTY_SEED_DEFAULT);
+        int limit =
+                (int) SqlppRQGTestBase.getLongConfigurationProperty(CONF_PROPERTY_LIMIT, CONF_PROPERTY_LIMIT_DEFAULT);
+        int testOffset =
+                (int) SqlppRQGTestBase.getLongConfigurationProperty(CONF_PROPERTY_OFFSET, CONF_PROPERTY_OFFSET_DEFAULT);
+
+        LOGGER.info(String.format("Testsuite configuration: -D%s=%d -D%s=%d -D%s=%d", CONF_PROPERTY_SEED, seed,
+                CONF_PROPERTY_LIMIT, limit, CONF_PROPERTY_OFFSET, testOffset));
+
+        RandomGenerator random = new MersenneTwister(seed);
+        datasetRowCount = randomElement(DATASET_ROWS, random);
+        datasetColumnLength =
+                DATASET_COLUMN_LENGTH_MIN + random.nextInt(DATASET_COLUMN_LENGTH_MAX - DATASET_COLUMN_LENGTH_MIN);
+
+        LOGGER.info(String.format("Dataset row count=%d, column length=%d", datasetRowCount, datasetColumnLength));
+
+        LinkedHashMap<IntBuffer, TestInstance> testCases = new LinkedHashMap<>();
+        int i = 0;
+        while (i < limit) {
+            int c0 = randomElement(DATASET_COLUMNS, random);
+            boolean c0nullable = random.nextInt(NULLABLE_COLUMN_RATIO) == 0;
+            int c1 = randomElement(DATASET_COLUMNS, random);
+            boolean c1nullable = random.nextInt(NULLABLE_COLUMN_RATIO) == 0;
+            boolean outerJoin = random.nextInt(OUTER_JOIN_RATIO) == 0;
+            boolean broadcast = random.nextInt(BROADCAST_RATIO) == 0;
+            TestInstance test = new TestInstance(i, c0, c0nullable, c1, c1nullable, outerJoin, broadcast);
+            IntBuffer testSignature = test.signature();
+            if (testCases.containsKey(testSignature)) {
+                continue;
+            }
+            if (i >= testOffset) {
+                testCases.put(testSignature, test);
+            }
+            i++;
+        }
+        return testCases.values();
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        testExecutor = new TestExecutor();
+        LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME, testExecutor, false);
+
+        FileUtils.forceMkdir(OUTPUT_DIR.toFile());
+        for (String datasetName : DATASET_NAMES) {
+            Path datasetFilePath = OUTPUT_DIR.resolve(datasetName + ".adm");
+            LOGGER.info("Writing data file: " + datasetFilePath.toAbsolutePath());
+            try (PrintWriter pw = new PrintWriter(datasetFilePath.toFile())) {
+                for (int i = 0; i < datasetRowCount; i++) {
+                    writeRecord(pw, datasetName, i);
+                }
+            }
+        }
+
+        StringBuilder sb = new StringBuilder(2048);
+        addDropDataverse(sb, DATAVERSE_NAME);
+        addCreateDataverse(sb, DATAVERSE_NAME);
+        for (String datasetName : DATASET_NAMES) {
+            addCreateDataset(sb, DATAVERSE_NAME, datasetName);
+            addLoadDataset(sb, DATAVERSE_NAME, datasetName);
+        }
+        executeUpdateOrDdl(sb.toString());
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Test
+    public void test() throws Exception {
+        LOGGER.info(testInstance);
+        testInstance.execute();
+    }
+
+    private static void addDropDataverse(StringBuilder sb, String dataverseName) {
+        sb.append(String.format("DROP DATAVERSE %s IF EXISTS;\n", dataverseName));
+    }
+
+    private static void addCreateDataverse(StringBuilder sb, String dataverseName) {
+        sb.append(String.format("CREATE DATAVERSE %s;\n", dataverseName));
+    }
+
+    private static void addCreateDataset(StringBuilder sb, String dataverseName, String datasetName) {
+        sb.append("CREATE DATASET ").append(dataverseName).append('.').append(datasetName);
+        sb.append(" (").append(ID_COLUMN_NAME).append(" string");
+        sb.append(") ");
+        sb.append("OPEN TYPE PRIMARY KEY ").append(ID_COLUMN_NAME).append(";\n");
+    }
+
+    private static void addLoadDataset(StringBuilder sb, String dataverseName, String datasetName) {
+        sb.append(String.format(
+                "LOAD DATASET %s.%s USING localfs((`path`=`asterix_nc1://%s/%s.adm`),(`format`=`adm`));%n",
+                dataverseName, datasetName, OUTPUT_DIR, datasetName));
+    }
+
+    private static void writeRecord(PrintWriter pw, String datasetName, int id) throws IOException {
+        pw.print("{");
+        pw.print(String.format("\"%s\": \"%s:%d\"", ID_COLUMN_NAME, datasetName, id));
+        int nColumns = DATASET_COLUMNS.size();
+        for (int i = 0; i < nColumns; i++) {
+            long c = DATASET_COLUMNS.get(i);
+            writeColumn(pw, c, false, id); // no NULL/MISSING
+            writeColumn(pw, c, true, id); // with NULL/MISSING
+        }
+        pw.println("}");
+    }
+
+    private static String getColumnName(long c, boolean nullable) {
+        return BASE_COLUMN_NAME + c + (nullable ? "n" : "");
+    }
+
+    private static void writeColumn(Appendable out, long c, boolean nullable, long id) throws IOException {
+        String columnName = getColumnName(c, nullable);
+        boolean isNull = false;
+        long v;
+        if (nullable) {
+            long r = id % (2 * c);
+            if (r < c) {
+                v = r + 1;
+            } else if (r % 2 == 0) {
+                v = 0;
+                isNull = true;
+            } else {
+                // MISSING -> nothing to do
+                return;
+            }
+        } else {
+            long r = id % c;
+            v = r + 1;
+        }
+        String text;
+        if (isNull) {
+            text = "null";
+        } else {
+            int cLen = datasetColumnLength;
+            StringBuilder textBuilder = new StringBuilder(cLen + 2);
+            textBuilder.append('"').append(v);
+            int pad = cLen - (textBuilder.length() - 1);
+            for (int i = 0; i < pad; i++) {
+                textBuilder.append(' ');
+            }
+            textBuilder.append('"');
+            text = textBuilder.toString();
+        }
+        out.append(String.format(",\"%s\":%s", columnName, text));
+    }
+
+    private static void executeUpdateOrDdl(String statement) throws Exception {
+        LOGGER.debug("Executing: " + statement);
+        testExecutor.executeSqlppUpdateOrDdl(statement, TestCaseContext.OutputFormat.CLEAN_JSON);
+    }
+
+    private static Pair<ArrayNode, String> executeQuery(String query, boolean fetchPlan) throws Exception {
+        LOGGER.debug("Executing: " + query);
+
+        List<TestCase.CompilationUnit.Parameter> params;
+        if (fetchPlan) {
+            TestCase.CompilationUnit.Parameter planParameter = new TestCase.CompilationUnit.Parameter();
+            planParameter.setName(QueryServiceRequestParameters.Parameter.OPTIMIZED_LOGICAL_PLAN.str());
+            planParameter.setValue(Boolean.TRUE.toString());
+            planParameter.setType(ParameterTypeEnum.STRING);
+            params = Collections.singletonList(planParameter);
+        } else {
+            params = Collections.emptyList();
+        }
+
+        try (InputStream resultStream = testExecutor.executeQueryService(query, TestCaseContext.OutputFormat.CLEAN_JSON,
+                testExecutor.getEndpoint(Servlets.QUERY_SERVICE), params, true, StandardCharsets.UTF_8)) {
+            JsonNode r = OBJECT_READER.readTree(resultStream);
+            JsonNode errors = r.get("errors");
+            if (errors != null) {
+                Assert.fail("Query failed: " + errors);
+            }
+            JsonNode results = r.get("results");
+            if (!results.isArray()) {
+                Assert.fail("Expected array result, got: " + results);
+            }
+            ArrayNode resultsArray = (ArrayNode) results;
+            String plan = fetchPlan ? r.get("plans").get("optimizedLogicalPlan").asText() : null;
+            return new Pair<>(resultsArray, plan);
+        }
+    }
+
+    private static <T> T randomElement(List<T> list, RandomGenerator randomGenerator) {
+        return list.get(randomGenerator.nextInt(list.size()));
+    }
+
+    private static class TestInstance {
+
+        private final int id;
+
+        private final int c0;
+        private final int c1;
+        private final boolean c0nullable;
+        private final boolean c1nullable;
+        private final String col0;
+        private final String col1;
+        private final boolean outerJoin;
+        private final boolean broadcastJoin;
+
+        public TestInstance(int id, int c0, boolean c0nullable, int c1, boolean c1nullable, boolean outerJoin,
+                boolean broadcastJoin) {
+            this.id = id;
+            this.outerJoin = outerJoin;
+            this.c0 = c0;
+            this.c1 = c1;
+            this.c0nullable = c0nullable;
+            this.c1nullable = c1nullable;
+            this.broadcastJoin = broadcastJoin;
+            this.col0 = getColumnName(c0, c0nullable);
+            this.col1 = getColumnName(c1, c1nullable);
+        }
+
+        IntBuffer signature() {
+            return IntBuffer.wrap(
+                    new int[] { c0, toInt(c0nullable), c1, toInt(c1nullable), toInt(outerJoin), toInt(broadcastJoin) });
+        }
+
+        void execute() throws Exception {
+            String query = createQuery();
+            Pair<ArrayNode, String> res = executeQuery(query, true);
+            String plan = res.second;
+            if (!plan.contains(PhysicalOperatorTag.HYBRID_HASH_JOIN.toString())) {
+                Assert.fail(PhysicalOperatorTag.HYBRID_HASH_JOIN + " operator was not used in query plan " + plan);
+            }
+            if (broadcastJoin && !plan.contains(PhysicalOperatorTag.BROADCAST_EXCHANGE.toString())) {
+                Assert.fail(PhysicalOperatorTag.BROADCAST_EXCHANGE + " operator was not used in query plan " + plan);
+            }
+            ArrayNode resultArray = res.first;
+
+            long expectedRowCount;
+            long expectedRowCountInnerJoin = Math.min(c0, c1);
+            if (outerJoin) {
+                expectedRowCount = expectedRowCountInnerJoin + (c0nullable ? 2 : 0) + Math.max(0, c0 - c1);
+            } else {
+                expectedRowCount = expectedRowCountInnerJoin;
+            }
+
+            long expectedAggCountInnerJoin = (datasetRowCount * datasetRowCount) / (((long) c0) * c1)
+                    / (c0nullable ? 2 : 1) / (c1nullable ? 2 : 1);
+
+            int actualRowCount = resultArray.size();
+            if (actualRowCount != expectedRowCount) {
+                String commentHash = String.format("%s;%s", this, query);
+                File fHash = SqlppRQGTestBase.writeResult(OUTPUT_DIR, resultArray, id, "hash", commentHash);
+                Assert.fail(String.format("Unexpected row count %d for query #%d [%s]. Expected row count: %d %n %s ",
+                        actualRowCount, id, this, expectedRowCount, fHash.getAbsolutePath()));
+            }
+
+            String col0Alias = String.format("%s_%s", DATASET_NAMES[0], col0);
+
+            for (int i = 0; i < actualRowCount; i++) {
+                JsonNode resultRecord = resultArray.get(i);
+                long actualAggCount = resultRecord.get("cnt").longValue();
+
+                long expectedAggCount;
+                if (outerJoin) {
+                    JsonNode col0Node = resultRecord.get(col0Alias);
+                    if (col0Node == null || col0Node.isNull()) {
+                        expectedAggCount = datasetRowCount / 4;
+                    } else {
+                        if (getValueAsLong(col0Node) > c1) {
+                            expectedAggCount = datasetRowCount / (c0nullable ? 2 : 1) / c0;
+                        } else {
+                            expectedAggCount = expectedAggCountInnerJoin;
+                        }
+                    }
+                } else {
+                    expectedAggCount = expectedAggCountInnerJoin;
+                }
+
+                if (actualAggCount != expectedAggCount) {
+                    String commentHash = String.format("%s;%s", this, query);
+                    File fHash = SqlppRQGTestBase.writeResult(OUTPUT_DIR, resultArray, id, "hash", commentHash);
+                    Assert.fail(String.format(
+                            "Unexpected agg count %d in row %d for query #%d [%s]. Expected agg count: %d %n %s ",
+                            actualAggCount, i, id, this, expectedAggCount, fHash.getAbsolutePath()));
+                }
+            }
+        }
+
+        private long getValueAsLong(JsonNode node) throws Exception {
+            String text = node.textValue().trim();
+            if (text.isEmpty()) {
+                throw new Exception("Unexpected result value: " + node);
+            }
+            try {
+                return Long.parseLong(text);
+            } catch (NumberFormatException e) {
+                throw new Exception("Unexpected result value: " + node);
+            }
+        }
+
+        String createQuery() {
+            return String.format(
+                    "USE %s; SELECT t1.%s AS %s_%s, t2.%s AS %s_%s, count(*) AS cnt FROM %s t1 %s JOIN %s t2 ON t1.%s /*%s*/ = t2.%s /*%s*/ GROUP BY t1.%s, t2.%s ORDER BY t1.%s, t2.%s",
+                    DATAVERSE_NAME, col0, DATASET_NAMES[0], col0, col1, DATASET_NAMES[1], col1, DATASET_NAMES[0],
+                    getJoinKind(), DATASET_NAMES[1], col0, getJoinHint(), col1, getGroupByHint(), col0, col1, col0,
+                    col1);
+        }
+
+        private String getJoinKind() {
+            return outerJoin ? "LEFT OUTER" : "INNER";
+        }
+
+        private String getJoinHint() {
+            return broadcastJoin ? "+" + SqlppHint.HASH_BROADCAST_JOIN_HINT.getIdentifier() : "";
+        }
+
+        private String getGroupByHint() {
+            return "";
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s ON %s=%s %s", getJoinKind(), col0, col1, getJoinHint());
+        }
+
+        static int toInt(boolean b) {
+            return b ? 1 : 0;
+        }
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
index 5201681..e97a389 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
@@ -127,7 +127,7 @@
     @BeforeClass
     public static void setUp() throws Exception {
         testExecutor = new TestExecutor();
-        LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME, testExecutor);
+        LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME, testExecutor, false);
 
         StringBuilder sb = new StringBuilder(2048);
         addDropDataverse(sb, DATAVERSE_NAME);
@@ -293,7 +293,7 @@
             }
         }
         sb.append(") ");
-        sb.append("OPEN TYPE PRIMARY KEY id;\n");
+        sb.append("OPEN TYPE PRIMARY KEY ").append(ID_COLUMN_NAME).append(";\n");
     }
 
     private static void addLoadDataset(StringBuilder sb, String dataverseName, String datasetName) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
index ec4b55f..dbf214b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
@@ -381,7 +381,7 @@
 
     protected static void startAsterix() throws Exception {
         testExecutor = new TestExecutor();
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor, false);
         loadAsterixData();
     }
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.1.query.sqlpp
new file mode 100644
index 0000000..337226d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.1.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test hash join when values on both side are MISSING
+ */
+
+with
+R as (
+  from range(1, 50000) r
+  select (case when get_year(current_date()) > 0 then missing else r end) as r
+),
+
+S as (
+  from range(1, 50000) s
+  select (case when get_year(current_date()) > 0 then missing else s end) as s
+)
+
+select count(*) cnt
+from R, S
+where R.r = S.s;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.2.query.sqlpp
new file mode 100644
index 0000000..94de090
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.2.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test hash join when values on both side are NULL
+ */
+
+with
+R as (
+  from range(1, 50000) r
+  select (case when get_year(current_date()) > 0 then null else r end) as r
+),
+
+S as (
+  from range(1, 50000) s
+  select (case when get_year(current_date()) > 0 then null else s end) as s
+)
+
+select count(*) cnt
+from R, S
+where R.r = S.s;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.1.adm
new file mode 100644
index 0000000..bacb60c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.1.adm
@@ -0,0 +1 @@
+{ "cnt": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.2.adm
new file mode 100644
index 0000000..bacb60c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.2.adm
@@ -0,0 +1 @@
+{ "cnt": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index d400c27..c1aed2a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6630,6 +6630,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="join">
+      <compilation-unit name="hash_join_missing">
+        <output-dir compare="Text">hash_join_missing</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="join">
       <compilation-unit name="hash_join_record">
         <output-dir compare="Text">hash_join_record</output-dir>
       </compilation-unit>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
index ab9f4e0..286d1cc 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
@@ -26,31 +26,24 @@
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 
-/*
-Provides PredicateEvaluator for equi-join cases to properly take care of NULL fields, being compared with each other.
-If any of the join keys, from either side, is NULL, record should not pass equi-join condition.
-*/
+/**
+ * Provides PredicateEvaluator for equi-join cases to disqualify tuples having NULL/MISSING fields
+ * If any of the join keys, from either side, is NULL/MISSING, the tuple will not pass equi-join condition.
+ */
 public class PredicateEvaluatorFactoryProvider implements IPredicateEvaluatorFactoryProvider {
 
     private static final long serialVersionUID = 1L;
     public static final PredicateEvaluatorFactoryProvider INSTANCE = new PredicateEvaluatorFactoryProvider();
 
     @Override
-    public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(final int[] keys0, final int[] keys1) {
+    public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(final int[] keys) {
 
         return new IPredicateEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
             public IPredicateEvaluator createPredicateEvaluator() {
-                return new IPredicateEvaluator() {
-
-                    @Override
-                    public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1,
-                            int tupId1) {
-                        return noNullOrMissingInKeys(fta0, tupId0, keys0) && noNullOrMissingInKeys(fta1, tupId1, keys1);
-                    }
-                };
+                return (fta, tupId) -> noNullOrMissingInKeys(fta, tupId, keys);
             }
         };
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 7e7d012..cc89deb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -109,10 +109,11 @@
         IBinaryHashFunctionFamily[] rightHashFunFamilies =
                 JobGenHelper.variablesToBinaryHashFunctionFamilies(keysRightBranch, env, context);
 
-        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
-                context.getPredicateEvaluatorFactoryProvider();
-        IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider == null ? null
-                : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
+        IPredicateEvaluatorFactoryProvider predEvalFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+        IPredicateEvaluatorFactory leftPredEvalFactory =
+                predEvalFactoryProvider == null ? null : predEvalFactoryProvider.getPredicateEvaluatorFactory(keysLeft);
+        IPredicateEvaluatorFactory rightPredEvalFactory = predEvalFactoryProvider == null ? null
+                : predEvalFactoryProvider.getPredicateEvaluatorFactory(keysRight);
 
         RecordDescriptor recDescriptor =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
@@ -131,7 +132,7 @@
 
         opDesc = generateOptimizedHashJoinRuntime(context, joinOp, inputSchemas, keysLeft, keysRight,
                 leftHashFunFamilies, rightHashFunFamilies, comparatorFactory, reverseComparatorFactory,
-                predEvaluatorFactory, recDescriptor, spec);
+                leftPredEvalFactory, rightPredEvalFactory, recDescriptor, spec);
         opDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
@@ -145,20 +146,20 @@
             AbstractBinaryJoinOperator joinOp, IOperatorSchema[] inputSchemas, int[] keysLeft, int[] keysRight,
             IBinaryHashFunctionFamily[] leftHashFunFamilies, IBinaryHashFunctionFamily[] rightHashFunFamilies,
             ITuplePairComparatorFactory comparatorFactory, ITuplePairComparatorFactory reverseComparatorFactory,
-            IPredicateEvaluatorFactory predEvaluatorFactory, RecordDescriptor recDescriptor,
-            IOperatorDescriptorRegistry spec) throws AlgebricksException {
+            IPredicateEvaluatorFactory leftPredEvalFactory, IPredicateEvaluatorFactory rightPredEvalFactory,
+            RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) throws AlgebricksException {
         int memSizeInFrames = localMemoryRequirements.getMemoryBudgetInFrames();
         switch (kind) {
             case INNER:
                 return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
                         getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies, recDescriptor,
-                        comparatorFactory, reverseComparatorFactory, predEvaluatorFactory);
+                        comparatorFactory, reverseComparatorFactory, leftPredEvalFactory, rightPredEvalFactory);
             case LEFT_OUTER:
                 IMissingWriterFactory[] nonMatchWriterFactories = JobGenHelper.createMissingWriterFactories(context,
                         ((LeftOuterJoinOperator) joinOp).getMissingValue(), inputSchemas[1].getSize());
                 return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
                         getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies, recDescriptor,
-                        comparatorFactory, reverseComparatorFactory, predEvaluatorFactory, true,
+                        comparatorFactory, reverseComparatorFactory, leftPredEvalFactory, rightPredEvalFactory, true,
                         nonMatchWriterFactories);
             default:
                 throw new NotImplementedException();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 84dda11..b96b887 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -44,8 +44,6 @@
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -93,11 +91,6 @@
         IBinaryHashFunctionFactory[] rightHashFunFactories =
                 JobGenHelper.variablesToBinaryHashFunctionFactories(keysRightBranch, env, context);
 
-        IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
-                context.getPredicateEvaluatorFactoryProvider();
-        IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider == null ? null
-                : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
-
         RecordDescriptor recDescriptor =
                 JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
@@ -116,15 +109,14 @@
         switch (kind) {
             case INNER:
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, leftHashFunFactories,
-                        rightHashFunFactories, comparatorFactory, recDescriptor, tableSize, predEvaluatorFactory,
-                        memSizeInFrames);
+                        rightHashFunFactories, comparatorFactory, recDescriptor, tableSize, memSizeInFrames);
                 break;
             case LEFT_OUTER:
                 IMissingWriterFactory[] nonMatchWriterFactories = JobGenHelper.createMissingWriterFactories(context,
                         ((LeftOuterJoinOperator) joinOp).getMissingValue(), inputSchemas[1].getSize());
                 opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, leftHashFunFactories,
-                        rightHashFunFactories, comparatorFactory, predEvaluatorFactory, recDescriptor, true,
-                        nonMatchWriterFactories, tableSize, memSizeInFrames);
+                        rightHashFunFactories, comparatorFactory, recDescriptor, true, nonMatchWriterFactories,
+                        tableSize, memSizeInFrames);
                 break;
             default:
                 throw new NotImplementedException();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
index 5fbc1ab..20381f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
@@ -21,9 +21,6 @@
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 
-/*
- * Compares two tuples to make sure that records, whose comparison keys are NULL do not pass comparator filter
- */
 public interface IPredicateEvaluator {
-    public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1, int tupId1);
+    boolean evaluate(IFrameTupleAccessor fta, int tupId);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
index eb94514..8748587 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
@@ -22,9 +22,8 @@
 import java.io.Serializable;
 
 /*
- * Provides PredicateEvaluator for equi-join related operators
+ * Provides PredicateEvaluator
  */
-
 public interface IPredicateEvaluatorFactory extends Serializable {
-    public IPredicateEvaluator createPredicateEvaluator();
+    IPredicateEvaluator createPredicateEvaluator();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
index 3eefc29..bc3e5c8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
@@ -24,7 +24,6 @@
 /*
  * Provides PredicateEvaluatorFactory based on (equi-join) keys
  */
-
 public interface IPredicateEvaluatorFactoryProvider extends Serializable {
-    public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys0, int[] keys1);
+    IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
index 398dee9..7cee242 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
@@ -28,16 +28,21 @@
     private HybridHashJoinUtil() {
     }
 
+    public enum SIDE {
+        BUILD,
+        PROBE
+    }
+
     /**
      * Prints out the detailed information for partitions: in-memory and spilled partitions.
      * This method exists for a debug purpose.
      */
-    public String printPartitionInfo(BitSet spilledStatus, OptimizedHybridHashJoin.SIDE whichSide, int numOfPartitions,
-            int[] probePSizeInTups, int[] buildPSizeInTups, RunFileWriter[] probeRFWriters,
-            RunFileWriter[] buildRFWriters, IPartitionedTupleBufferManager bufferManager) {
+    public String printPartitionInfo(BitSet spilledStatus, SIDE whichSide, int numOfPartitions, int[] probePSizeInTups,
+            int[] buildPSizeInTups, RunFileWriter[] probeRFWriters, RunFileWriter[] buildRFWriters,
+            IPartitionedTupleBufferManager bufferManager) {
         StringBuilder buf = new StringBuilder();
         buf.append(">>> " + this + " " + Thread.currentThread().getId() + " printInfo():" + "\n");
-        if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+        if (whichSide == SIDE.BUILD) {
             buf.append("BUILD side" + "\n");
         } else {
             buf.append("PROBE side" + "\n");
@@ -49,7 +54,7 @@
         int spilledPartByteSize = 0;
         for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
                 spilledStatus.nextSetBit(pid + 1)) {
-            if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+            if (whichSide == SIDE.BUILD) {
                 spilledTupleCount += buildPSizeInTups[pid];
                 spilledPartByteSize += buildRFWriters[pid].getFileSize();
                 buf.append("part:\t" + pid + "\t#tuple:\t" + buildPSizeInTups[pid] + "\tsize(MB):\t"
@@ -70,7 +75,7 @@
         int inMemoryPartByteSize = 0;
         for (int pid = spilledStatus.nextClearBit(0); pid >= 0 && pid < numOfPartitions; pid =
                 spilledStatus.nextClearBit(pid + 1)) {
-            if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+            if (whichSide == SIDE.BUILD) {
                 inMemoryTupleCount += buildPSizeInTups[pid];
                 inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
             } else {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index cb63b6a..4c2019e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -28,7 +28,6 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -59,7 +58,6 @@
     private final ISerializableTable table;
     private final TuplePointer storedTuplePointer;
     private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
-    private final IPredicateEvaluator predEvaluator;
     private final TupleInFrameListAccessor tupleAccessor;
     // To release frames
     private final ISimpleFrameBufferManager bufferManager;
@@ -70,17 +68,16 @@
     public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
             ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
             ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
-            ISerializableTable table, IPredicateEvaluator predEval, ISimpleFrameBufferManager bufferManager)
-            throws HyracksDataException {
+            ISerializableTable table, ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
         this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, isLeftOuter, missingWritersBuild, table,
-                predEval, false, bufferManager);
+                false, bufferManager);
     }
 
     public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
             ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
             ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
-            ISerializableTable table, IPredicateEvaluator predEval, boolean reverse,
-            ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
+            ISerializableTable table, boolean reverse, ISimpleFrameBufferManager bufferManager)
+            throws HyracksDataException {
         this.table = table;
         storedTuplePointer = new TuplePointer();
         buffers = new ArrayList<>();
@@ -89,7 +86,6 @@
         this.accessorProbe = accessorProbe;
         this.tpcProbe = tpcProbe;
         appender = new FrameTupleAppender(new VSizeFrame(ctx));
-        predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
             int fieldCountOuter = accessorBuild.getFieldCount();
@@ -178,11 +174,8 @@
                 accessorBuild.reset(buffers.get(bIndex));
                 int c = tpComparator.compare(accessorProbe, tid, accessorBuild, tIndex);
                 if (c == 0) {
-                    boolean predEval = evaluatePredicate(tid, tIndex);
-                    if (predEval) {
-                        matchFound = true;
-                        appendToResult(tid, tIndex, writer);
-                    }
+                    matchFound = true;
+                    appendToResult(tid, tIndex, writer);
                 }
             }
         }
@@ -228,14 +221,6 @@
         table.close();
     }
 
-    private boolean evaluatePredicate(int tIx1, int tIx2) {
-        if (reverseOutputOrder) { //Role Reversal Optimization is triggered
-            return (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1);
-        } else {
-            return (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2);
-        }
-    }
-
     private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException {
         if (reverseOutputOrder) {
             FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe, probeSidetIx);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 33976a8..f89ccb0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -32,8 +32,6 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -63,7 +61,6 @@
     private final IBinaryHashFunctionFactory[] hashFunctionFactories0;
     private final IBinaryHashFunctionFactory[] hashFunctionFactories1;
     private final ITuplePairComparatorFactory comparatorFactory;
-    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final IMissingWriterFactory[] nonMatchWriterFactories;
     private final int tableSize;
@@ -73,23 +70,21 @@
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories0, IBinaryHashFunctionFactory[] hashFunctionFactories1,
             ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int tableSize,
-            IPredicateEvaluatorFactory predEvalFactory, int memSizeInFrames) {
-        this(spec, keys0, keys1, hashFunctionFactories0, hashFunctionFactories1, comparatorFactory, predEvalFactory,
-                recordDescriptor, false, null, tableSize, memSizeInFrames);
+            int memSizeInFrames) {
+        this(spec, keys0, keys1, hashFunctionFactories0, hashFunctionFactories1, comparatorFactory, recordDescriptor,
+                false, null, tableSize, memSizeInFrames);
     }
 
     public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
             IBinaryHashFunctionFactory[] hashFunctionFactories0, IBinaryHashFunctionFactory[] hashFunctionFactories1,
-            ITuplePairComparatorFactory comparatorFactory, IPredicateEvaluatorFactory predEvalFactory,
-            RecordDescriptor recordDescriptor, boolean isLeftOuter, IMissingWriterFactory[] missingWriterFactories1,
-            int tableSize, int memSizeInFrames) {
+            ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+            IMissingWriterFactory[] missingWriterFactories1, int tableSize, int memSizeInFrames) {
         super(spec, 2, 1);
         this.keys0 = keys0;
         this.keys1 = keys1;
         this.hashFunctionFactories0 = hashFunctionFactories0;
         this.hashFunctionFactories1 = hashFunctionFactories1;
         this.comparatorFactory = comparatorFactory;
-        this.predEvaluatorFactory = predEvalFactory;
         this.outRecDescs[0] = recordDescriptor;
         this.isLeftOuter = isLeftOuter;
         this.nonMatchWriterFactories = missingWriterFactories1;
@@ -159,8 +154,6 @@
             } else {
                 nullWriters1 = null;
             }
-            final IPredicateEvaluator predEvaluator =
-                    (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
 
             final int memSizeInBytes = memSizeInFrames * jobletCtx.getInitialFrameSize();
             final IDeallocatableFramePool framePool = new DeallocatableFramePool(jobletCtx, memSizeInBytes);
@@ -178,8 +171,7 @@
                     state = new HashBuildTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
                     ISerializableTable table = new SerializableHashTable(tableSize, jobletCtx, bufferManager);
                     state.joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(rd0), hpc0,
-                            new FrameTupleAccessor(rd1), rd1, hpc1, isLeftOuter, nullWriters1, table, predEvaluator,
-                            bufferManager);
+                            new FrameTupleAccessor(rd1), rd1, hpc1, isLeftOuter, nullWriters1, table, bufferManager);
                 }
 
                 @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 03ff72f..414ae25 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -27,7 +27,6 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -63,22 +62,20 @@
     private final RunFileWriter runFileWriter;
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder missingTupleBuilder;
-    private final IPredicateEvaluator predEvaluator;
-    // Added for handling correct calling for predicate-evaluator upon recursive calls
+    // Added for handling correct calling of recursive calls
     // (in OptimizedHybridHashJoin) that cause role-reversal
     private final boolean isReversed;
     private final BufferInfo tempInfo = new BufferInfo(null, -1, -1);
     private final BitSet outerMatchLOJ;
 
     public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
-            FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator predEval, boolean isLeftOuter,
+            FrameTupleAccessor accessorInner, int memBudgetInFrames, boolean isLeftOuter,
             IMissingWriter[] missingWriters) throws HyracksDataException {
-        this(jobletContext, accessorOuter, accessorInner, memBudgetInFrames, predEval, isLeftOuter, missingWriters,
-                false);
+        this(jobletContext, accessorOuter, accessorInner, memBudgetInFrames, isLeftOuter, missingWriters, false);
     }
 
     public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
-            FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator predEval, boolean isLeftOuter,
+            FrameTupleAccessor accessorInner, int memBudgetInFrames, boolean isLeftOuter,
             IMissingWriter[] missingWriters, boolean isReversed) throws HyracksDataException {
         this.accessorInner = accessorInner;
         this.accessorOuter = accessorOuter;
@@ -97,7 +94,6 @@
                 new VariableFramePool(jobletContext, outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory
                         .createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, outerBufferMngrMemBudgetInFrames));
 
-        this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
             if (isReversed) {
@@ -202,8 +198,7 @@
             boolean matchFound = false;
             for (int j = 0; j < innerTupleCount; ++j) {
                 int c = tpComparator.compare(accessorOuter, i, accessorInner, j);
-                boolean prdEval = evaluatePredicate(i, j);
-                if (c == 0 && prdEval) {
+                if (c == 0) {
                     matchFound = true;
                     appendToResults(i, j, writer);
                 }
@@ -214,14 +209,6 @@
         }
     }
 
-    private boolean evaluatePredicate(int tIx1, int tIx2) {
-        if (isReversed) { //Role Reversal Optimization is triggered
-            return ((predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1));
-        } else {
-            return ((predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2));
-        }
-    }
-
     private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException {
         if (isReversed) {
             appendResultToFrame(accessorInner, innerTupleId, accessorOuter, outerTupleId, writer);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 1de8094..24e1b45 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -29,8 +29,6 @@
 import org.apache.hyracks.api.dataflow.TaskId;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -52,25 +50,16 @@
     private static final long serialVersionUID = 1L;
     private final ITuplePairComparatorFactory comparatorFactory;
     private final int memSize;
-    private final IPredicateEvaluatorFactory predEvaluatorFactory;
     private final boolean isLeftOuter;
     private final IMissingWriterFactory[] nullWriterFactories1;
 
     public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
             ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
             boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
-        this(spec, comparatorFactory, recordDescriptor, memSize, null, isLeftOuter, nullWriterFactories1);
-    }
-
-    public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
-            IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter,
-            IMissingWriterFactory[] nullWriterFactories1) {
         super(spec, 2, 1);
         this.comparatorFactory = comparatorFactory;
         this.outRecDescs[0] = recordDescriptor;
         this.memSize = memSize;
-        this.predEvaluatorFactory = predEvalFactory;
         this.isLeftOuter = isLeftOuter;
         this.nullWriterFactories1 = nullWriterFactories1;
     }
@@ -117,8 +106,6 @@
             final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final IPredicateEvaluator predEvaluator =
-                    (predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null;
 
             final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
             if (isLeftOuter) {
@@ -134,8 +121,7 @@
                 public void open() throws HyracksDataException {
                     state = new JoinCacheTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
                     state.joiner = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(rd0),
-                            new FrameTupleAccessor(rd1), memSize, predEvaluator, isLeftOuter, nullWriters1);
-
+                            new FrameTupleAccessor(rd1), memSize, isLeftOuter, nullWriters1);
                 }
 
                 @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index f5eb4f7..7a9bb25 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -59,11 +59,6 @@
     // Used for special probe BigObject which can not be held into the Join memory
     private FrameTupleAppender bigFrameAppender;
 
-    public enum SIDE {
-        BUILD,
-        PROBE
-    }
-
     private final IHyracksJobletContext jobletCtx;
 
     private final String buildRelName;
@@ -74,7 +69,8 @@
     private final RecordDescriptor probeRd;
     private final RunFileWriter[] buildRFWriters; //writing spilled build partitions
     private final RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-    private final IPredicateEvaluator predEvaluator;
+    private final IPredicateEvaluator buildPredEval;
+    private final IPredicateEvaluator probePredEval;
     private final boolean isLeftOuter;
     private final IMissingWriter[] nonMatchWriters;
     private final BitSet spilledStatus; //0=resident, 1=spilled
@@ -98,8 +94,8 @@
 
     public OptimizedHybridHashJoin(IHyracksJobletContext jobletCtx, int memSizeInFrames, int numOfPartitions,
             String probeRelName, String buildRelName, RecordDescriptor probeRd, RecordDescriptor buildRd,
-            ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval,
-            boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
+            ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator probePredEval,
+            IPredicateEvaluator buildPredEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
         this.jobletCtx = jobletCtx;
         this.memSizeInFrames = memSizeInFrames;
         this.buildRd = buildRd;
@@ -113,8 +109,12 @@
         this.probeRFWriters = new RunFileWriter[numOfPartitions];
         this.accessorBuild = new FrameTupleAccessor(buildRd);
         this.accessorProbe = new FrameTupleAccessor(probeRd);
-        this.predEvaluator = predEval;
         this.isLeftOuter = isLeftOuter;
+        if (isLeftOuter && probePredEval != null) {
+            throw new IllegalStateException();
+        }
+        this.buildPredEval = buildPredEval;
+        this.probePredEval = probePredEval;
         this.isReversed = false;
         this.spilledStatus = new BitSet(numOfPartitions);
         this.nonMatchWriters = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
@@ -141,11 +141,12 @@
         accessorBuild.reset(buffer);
         int tupleCount = accessorBuild.getTupleCount();
         for (int i = 0; i < tupleCount; ++i) {
-            int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
-            processTupleBuildPhase(i, pid);
-            buildPSizeInTups[pid]++;
+            if (buildPredEval == null || buildPredEval.evaluate(accessorBuild, i)) {
+                int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
+                processTupleBuildPhase(i, pid);
+                buildPSizeInTups[pid]++;
+            }
         }
-
     }
 
     private void processTupleBuildPhase(int tid, int pid) throws HyracksDataException {
@@ -217,8 +218,8 @@
 
         ISerializableTable table = new SerializableHashTable(inMemTupCount, jobletCtx, bufferManagerForHashTable);
         this.inMemJoiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRd), probeHpc,
-                new FrameTupleAccessor(buildRd), buildRd, buildHpc, isLeftOuter, nonMatchWriters, table, predEvaluator,
-                isReversed, bufferManagerForHashTable);
+                new FrameTupleAccessor(buildRd), buildRd, buildHpc, isLeftOuter, nonMatchWriters, table, isReversed,
+                bufferManagerForHashTable);
 
         buildHashTable();
     }
@@ -473,22 +474,28 @@
     public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
         accessorProbe.reset(buffer);
         int tupleCount = accessorProbe.getTupleCount();
-
-        if (isBuildRelAllInMemory()) {
-            inMemJoiner.join(buffer, writer);
-            return;
-        }
         inMemJoiner.resetAccessorProbe(accessorProbe);
-        for (int i = 0; i < tupleCount; ++i) {
-            int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
-
-            if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
-                if (spilledStatus.get(pid)) { //pid is Spilled
-                    processTupleProbePhase(i, pid);
-                } else { //pid is Resident
+        if (isBuildRelAllInMemory()) {
+            for (int i = 0; i < tupleCount; ++i) {
+                // NOTE: probePredEval is guaranteed to be 'null' for outer join and in case of role reversal
+                if (probePredEval == null || probePredEval.evaluate(accessorProbe, i)) {
                     inMemJoiner.join(i, writer);
                 }
-                probePSizeInTups[pid]++;
+            }
+        } else {
+            for (int i = 0; i < tupleCount; ++i) {
+                // NOTE: probePredEval is guaranteed to be 'null' for outer join and in case of role reversal
+                if (probePredEval == null || probePredEval.evaluate(accessorProbe, i)) {
+                    int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
+                    if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
+                        if (spilledStatus.get(pid)) { //pid is Spilled
+                            processTupleProbePhase(i, pid);
+                        } else { //pid is Resident
+                            inMemJoiner.join(i, writer);
+                        }
+                        probePSizeInTups[pid]++;
+                    }
+                }
             }
         }
     }
@@ -600,7 +607,10 @@
         return bufferManager.getPhysicalSize(pid);
     }
 
-    public void setIsReversed(boolean b) {
-        this.isReversed = b;
+    public void setIsReversed(boolean reversed) {
+        if (reversed && (buildPredEval != null || probePredEval != null)) {
+            throw new IllegalStateException();
+        }
+        this.isReversed = reversed;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index dcccd61..555e8fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -131,7 +131,8 @@
     private final IBinaryHashFunctionFamily[] buildHashFunctionFactories;
     private final ITuplePairComparatorFactory tuplePairComparatorFactoryProbe2Build; //For HHJ & NLJ in probe
     private final ITuplePairComparatorFactory tuplePairComparatorFactoryBuild2Probe; //For HHJ & NLJ in probe
-    private final IPredicateEvaluatorFactory predEvaluatorFactory;
+    private final IPredicateEvaluatorFactory probePredEvalFactory;
+    private final IPredicateEvaluatorFactory buildPredEvalFactory;
 
     private final boolean isLeftOuter;
     private final IMissingWriterFactory[] nonMatchWriterFactories;
@@ -148,8 +149,9 @@
             IBinaryHashFunctionFamily[] propHashFunctionFactories,
             IBinaryHashFunctionFamily[] buildHashFunctionFactories, RecordDescriptor recordDescriptor,
             ITuplePairComparatorFactory tupPaircomparatorFactory01,
-            ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory,
-            boolean isLeftOuter, IMissingWriterFactory[] nonMatchWriterFactories) {
+            ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvalFactory0,
+            IPredicateEvaluatorFactory predEvalFactory1, boolean isLeftOuter,
+            IMissingWriterFactory[] nonMatchWriterFactories) {
         super(spec, 2, 1);
         this.memSizeInFrames = memSizeInFrames;
         this.inputsize0 = inputsize0;
@@ -161,7 +163,8 @@
         this.tuplePairComparatorFactoryProbe2Build = tupPaircomparatorFactory01;
         this.tuplePairComparatorFactoryBuild2Probe = tupPaircomparatorFactory10;
         outRecDescs[0] = recordDescriptor;
-        this.predEvaluatorFactory = predEvaluatorFactory;
+        this.probePredEvalFactory = predEvalFactory0;
+        this.buildPredEvalFactory = predEvalFactory1;
         this.isLeftOuter = isLeftOuter;
         this.nonMatchWriterFactories = nonMatchWriterFactories;
     }
@@ -171,10 +174,11 @@
             IBinaryHashFunctionFamily[] propHashFunctionFactories,
             IBinaryHashFunctionFamily[] buildHashFunctionFactories, RecordDescriptor recordDescriptor,
             ITuplePairComparatorFactory tupPaircomparatorFactory01,
-            ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory) {
+            ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvalFactory0,
+            IPredicateEvaluatorFactory predEvalFactory1) {
         this(spec, memSizeInFrames, inputsize0, factor, keys0, keys1, propHashFunctionFactories,
                 buildHashFunctionFactories, recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10,
-                predEvaluatorFactory, false, null);
+                predEvalFactory0, predEvalFactory1, false, null);
     }
 
     @Override
@@ -268,8 +272,10 @@
 
             final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
-            final IPredicateEvaluator predEvaluator =
-                    (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+            final IPredicateEvaluator buildPredEval =
+                    (buildPredEvalFactory == null ? null : buildPredEvalFactory.createPredicateEvaluator());
+            final IPredicateEvaluator probePredEval = (probePredEvalFactory == null || isLeftOuter ? null
+                    : probePredEvalFactory.createPredicateEvaluator());
 
             return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
@@ -293,7 +299,7 @@
                             getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
                     state.hybridHJ = new OptimizedHybridHashJoin(ctx.getJobletContext(), state.memForJoin,
                             state.numOfPartitions, PROBE_REL, BUILD_REL, probeRd, buildRd, probeHpc, buildHpc,
-                            predEvaluator, isLeftOuter, nonMatchWriterFactories);
+                            probePredEval, buildPredEval, isLeftOuter, nonMatchWriterFactories);
 
                     state.hybridHJ.initBuild();
                     if (LOGGER.isTraceEnabled()) {
@@ -366,8 +372,6 @@
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator probComp = tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
             final ITuplePairComparator buildComp = tuplePairComparatorFactoryBuild2Probe.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator =
-                    predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator();
 
             final IMissingWriter[] nonMatchWriter =
                     isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
@@ -593,7 +597,7 @@
                     OptimizedHybridHashJoin rHHj;
                     int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
                     rHHj = new OptimizedHybridHashJoin(jobletCtx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd,
-                            buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories); //checked-confirmed
+                            buildRd, probeHpc, buildHpc, null, null, isLeftOuter, nonMatchWriterFactories); //checked-confirmed
 
                     rHHj.setIsReversed(isReversed);
                     try {
@@ -748,7 +752,7 @@
                     ISerializableTable table = new SerializableHashTable(tabSize, jobletCtx, bufferManager);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRDesc),
                             hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, isLeftOuter,
-                            nonMatchWriter, table, predEvaluator, isReversed, bufferManager);
+                            nonMatchWriter, table, isReversed, bufferManager);
                     joiner.setComparator(comp);
                     try {
                         bReader.open();
@@ -805,8 +809,7 @@
                     boolean isReversed = outerRd == buildRd && innerRd == probeRd;
                     ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp;
                     NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
-                            new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter,
-                            isReversed);
+                            new FrameTupleAccessor(innerRd), memorySize, isLeftOuter, nonMatchWriter, isReversed);
                     nlj.setComparator(nljComptorOuterInner);
 
                     IFrame cacheBuff = new VSizeFrame(jobletCtx);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
index 93739df..9215856 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
@@ -26,7 +26,6 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -61,7 +60,6 @@
     static RecordDescriptor buildRd;
     static ITuplePartitionComputer probeHpc;
     static ITuplePartitionComputer buildHpc;
-    static IPredicateEvaluator predEval;
     int memSizeInFrames = -1;
     int numOfPartitions = -1;
     boolean isLeftOuter = false;
@@ -151,7 +149,7 @@
     private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame frame) throws HyracksDataException {
 
         hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, probeRd,
-                buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
+                buildRd, probeHpc, buildHpc, null, null, isLeftOuter, null);
 
         hhj.initBuild();
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index a8bb1d5..296b682 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -133,7 +133,7 @@
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 128,
-                null, 128);
+                128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -176,12 +176,12 @@
                 new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
 
-        OptimizedHybridHashJoinOperatorDescriptor join =
-                new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20, 1.2, new int[] { 1 }, new int[] { 0 },
-                        new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
-                        new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
-                        custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
-                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, false, null);
+        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20,
+                1.2, new int[] { 1 }, new int[] { 0 },
+                new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
+                new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, custOrderJoinDesc,
+                new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
+                new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, null, false, null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
@@ -234,8 +234,8 @@
                 new int[] { 1 },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
-                new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, custOrderJoinDesc,
-                true, nonMatchWriterFactories, 128, 128);
+                new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), custOrderJoinDesc, true,
+                nonMatchWriterFactories, 128, 128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -288,7 +288,7 @@
                         new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                         new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                         custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
-                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, true,
+                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null, true,
                         nonMatchWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
@@ -343,7 +343,7 @@
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 128,
-                null, 128);
+                128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -396,12 +396,12 @@
                 new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
 
-        OptimizedHybridHashJoinOperatorDescriptor join =
-                new OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20, 1.2, new int[] { 1 }, new int[] { 0 },
-                        new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
-                        new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
-                        custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
-                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, false, null);
+        OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20, 1.2,
+                new int[] { 1 }, new int[] { 0 },
+                new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
+                new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, custOrderJoinDesc,
+                new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
+                new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, null, false, null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
@@ -460,7 +460,7 @@
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 128,
-                null, 128);
+                128);
         PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -524,7 +524,7 @@
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                 new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 128,
-                null, 128);
+                128);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -581,7 +581,7 @@
                         new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                         new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                         custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
-                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
@@ -629,7 +629,7 @@
                         new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                         new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                         custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
-                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
@@ -678,7 +678,7 @@
                         new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                         new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                         custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
-                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+                        new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
 
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 3bbeca8..a351c85 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -185,7 +185,7 @@
                     new IBinaryHashFunctionFactory[] {
                             PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
                     new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
-                    Common.custOrderJoinDesc, tableSize, null, memSize * frameSize);
+                    Common.custOrderJoinDesc, tableSize, memSize * frameSize);
 
         } else if ("hybrid".equalsIgnoreCase(algo)) {
             join = new OptimizedHybridHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceFactor,
@@ -194,7 +194,7 @@
                     new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                     Common.custOrderJoinDesc,
                     new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
-                    new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+                    new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null);
 
         } else {
             System.err.println("unknown algorithm:" + algo);