[ASTERIXDB-3004][RT] Improve hash join performance
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Improve hash join performance when joined values are NULL/MISSING
- Add SqlppHashJoinRQJTest to test different hash join scenarios
Change-Id: I8f0afb05908e8281f2865775e074d459964fe989
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14784
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 2328bf4..ccdf620 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -73,11 +73,10 @@
}
integrationUtil.init(cleanup, configFile);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("initializing HDFS");
- }
-
if (startHdfs) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("initializing HDFS");
+ }
HDFSCluster.getInstance().setup();
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHashJoinRQJTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHashJoinRQJTest.java
new file mode 100644
index 0000000..40ee38e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHashJoinRQJTest.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.nio.IntBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.asterix.api.http.server.QueryServiceRequestParameters;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.lang.sqlpp.parser.SqlppHint;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.ParameterTypeEnum;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.math3.random.MersenneTwister;
+import org.apache.commons.math3.random.RandomGenerator;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * RQG testsuite for hash joins.
+ * Tests:
+ * <ul>
+ * <li> Fields with / without NULL or MISSING values
+ * <li> Inner / Left Outer joins</li>
+ * <li> Repartitioning / Broadcast joins </li>
+ * </ul>
+ */
+@RunWith(Parameterized.class)
+public class SqlppHashJoinRQJTest {
+
+ static final Logger LOGGER = LogManager.getLogger(SqlppHashJoinRQJTest.class);
+
+ static final String CONF_PROPERTY_SEED =
+ SqlppRQGTestBase.getConfigurationPropertyName(SqlppHashJoinRQJTest.class, "seed");
+ static final long CONF_PROPERTY_SEED_DEFAULT = System.currentTimeMillis();
+
+ static final String CONF_PROPERTY_LIMIT =
+ SqlppRQGTestBase.getConfigurationPropertyName(SqlppHashJoinRQJTest.class, "limit");
+ static final int CONF_PROPERTY_LIMIT_DEFAULT = 40;
+
+ static final String CONF_PROPERTY_OFFSET =
+ SqlppRQGTestBase.getConfigurationPropertyName(SqlppHashJoinRQJTest.class, "offset");
+ static final int CONF_PROPERTY_OFFSET_DEFAULT = 0;
+
+ static final Path OUTPUT_DIR = Paths.get("target", SqlppHashJoinRQJTest.class.getSimpleName());
+
+ static final String DATAVERSE_NAME = "dvTest";
+ static final String[] DATASET_NAMES = new String[] { "ds1", "ds2" };
+ static final String ID_COLUMN_NAME = "id";
+ static final String BASE_COLUMN_NAME = "i";
+ static final List<Integer> DATASET_ROWS = Arrays.asList(20000, 40000);
+ static final List<Integer> DATASET_COLUMNS = Arrays.asList(4, 10, 100, 1000, 10000);
+ static final int DATASET_COLUMN_LENGTH_MIN =
+ String.valueOf(DATASET_COLUMNS.stream().mapToInt(Integer::intValue).max().orElse(0)).length();
+ static final int DATASET_COLUMN_LENGTH_MAX = Math.max(20, DATASET_COLUMN_LENGTH_MIN);
+ static final int NULLABLE_COLUMN_RATIO = 2;
+ static final int OUTER_JOIN_RATIO = 3;
+ static final int BROADCAST_RATIO = 4;
+
+ static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ static final ObjectReader OBJECT_READER = OBJECT_MAPPER.readerFor(ObjectNode.class);
+
+ static long datasetRowCount;
+ static int datasetColumnLength;
+ static TestExecutor testExecutor;
+
+ final TestInstance testInstance;
+
+ public SqlppHashJoinRQJTest(TestInstance testInstance) {
+ this.testInstance = testInstance;
+ }
+
+ @Parameterized.Parameters(name = "SqlppHashJoinRQJTest {index}: {0}")
+ public static Collection<TestInstance> tests() {
+ long seed = SqlppRQGTestBase.getLongConfigurationProperty(CONF_PROPERTY_SEED, CONF_PROPERTY_SEED_DEFAULT);
+ int limit =
+ (int) SqlppRQGTestBase.getLongConfigurationProperty(CONF_PROPERTY_LIMIT, CONF_PROPERTY_LIMIT_DEFAULT);
+ int testOffset =
+ (int) SqlppRQGTestBase.getLongConfigurationProperty(CONF_PROPERTY_OFFSET, CONF_PROPERTY_OFFSET_DEFAULT);
+
+ LOGGER.info(String.format("Testsuite configuration: -D%s=%d -D%s=%d -D%s=%d", CONF_PROPERTY_SEED, seed,
+ CONF_PROPERTY_LIMIT, limit, CONF_PROPERTY_OFFSET, testOffset));
+
+ RandomGenerator random = new MersenneTwister(seed);
+ datasetRowCount = randomElement(DATASET_ROWS, random);
+ datasetColumnLength =
+ DATASET_COLUMN_LENGTH_MIN + random.nextInt(DATASET_COLUMN_LENGTH_MAX - DATASET_COLUMN_LENGTH_MIN);
+
+ LOGGER.info(String.format("Dataset row count=%d, column length=%d", datasetRowCount, datasetColumnLength));
+
+ LinkedHashMap<IntBuffer, TestInstance> testCases = new LinkedHashMap<>();
+ int i = 0;
+ while (i < limit) {
+ int c0 = randomElement(DATASET_COLUMNS, random);
+ boolean c0nullable = random.nextInt(NULLABLE_COLUMN_RATIO) == 0;
+ int c1 = randomElement(DATASET_COLUMNS, random);
+ boolean c1nullable = random.nextInt(NULLABLE_COLUMN_RATIO) == 0;
+ boolean outerJoin = random.nextInt(OUTER_JOIN_RATIO) == 0;
+ boolean broadcast = random.nextInt(BROADCAST_RATIO) == 0;
+ TestInstance test = new TestInstance(i, c0, c0nullable, c1, c1nullable, outerJoin, broadcast);
+ IntBuffer testSignature = test.signature();
+ if (testCases.containsKey(testSignature)) {
+ continue;
+ }
+ if (i >= testOffset) {
+ testCases.put(testSignature, test);
+ }
+ i++;
+ }
+ return testCases.values();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ testExecutor = new TestExecutor();
+ LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME, testExecutor, false);
+
+ FileUtils.forceMkdir(OUTPUT_DIR.toFile());
+ for (String datasetName : DATASET_NAMES) {
+ Path datasetFilePath = OUTPUT_DIR.resolve(datasetName + ".adm");
+ LOGGER.info("Writing data file: " + datasetFilePath.toAbsolutePath());
+ try (PrintWriter pw = new PrintWriter(datasetFilePath.toFile())) {
+ for (int i = 0; i < datasetRowCount; i++) {
+ writeRecord(pw, datasetName, i);
+ }
+ }
+ }
+
+ StringBuilder sb = new StringBuilder(2048);
+ addDropDataverse(sb, DATAVERSE_NAME);
+ addCreateDataverse(sb, DATAVERSE_NAME);
+ for (String datasetName : DATASET_NAMES) {
+ addCreateDataset(sb, DATAVERSE_NAME, datasetName);
+ addLoadDataset(sb, DATAVERSE_NAME, datasetName);
+ }
+ executeUpdateOrDdl(sb.toString());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Test
+ public void test() throws Exception {
+ LOGGER.info(testInstance);
+ testInstance.execute();
+ }
+
+ private static void addDropDataverse(StringBuilder sb, String dataverseName) {
+ sb.append(String.format("DROP DATAVERSE %s IF EXISTS;\n", dataverseName));
+ }
+
+ private static void addCreateDataverse(StringBuilder sb, String dataverseName) {
+ sb.append(String.format("CREATE DATAVERSE %s;\n", dataverseName));
+ }
+
+ private static void addCreateDataset(StringBuilder sb, String dataverseName, String datasetName) {
+ sb.append("CREATE DATASET ").append(dataverseName).append('.').append(datasetName);
+ sb.append(" (").append(ID_COLUMN_NAME).append(" string");
+ sb.append(") ");
+ sb.append("OPEN TYPE PRIMARY KEY ").append(ID_COLUMN_NAME).append(";\n");
+ }
+
+ private static void addLoadDataset(StringBuilder sb, String dataverseName, String datasetName) {
+ sb.append(String.format(
+ "LOAD DATASET %s.%s USING localfs((`path`=`asterix_nc1://%s/%s.adm`),(`format`=`adm`));%n",
+ dataverseName, datasetName, OUTPUT_DIR, datasetName));
+ }
+
+ private static void writeRecord(PrintWriter pw, String datasetName, int id) throws IOException {
+ pw.print("{");
+ pw.print(String.format("\"%s\": \"%s:%d\"", ID_COLUMN_NAME, datasetName, id));
+ int nColumns = DATASET_COLUMNS.size();
+ for (int i = 0; i < nColumns; i++) {
+ long c = DATASET_COLUMNS.get(i);
+ writeColumn(pw, c, false, id); // no NULL/MISSING
+ writeColumn(pw, c, true, id); // with NULL/MISSING
+ }
+ pw.println("}");
+ }
+
+ private static String getColumnName(long c, boolean nullable) {
+ return BASE_COLUMN_NAME + c + (nullable ? "n" : "");
+ }
+
+ private static void writeColumn(Appendable out, long c, boolean nullable, long id) throws IOException {
+ String columnName = getColumnName(c, nullable);
+ boolean isNull = false;
+ long v;
+ if (nullable) {
+ long r = id % (2 * c);
+ if (r < c) {
+ v = r + 1;
+ } else if (r % 2 == 0) {
+ v = 0;
+ isNull = true;
+ } else {
+ // MISSING -> nothing to do
+ return;
+ }
+ } else {
+ long r = id % c;
+ v = r + 1;
+ }
+ String text;
+ if (isNull) {
+ text = "null";
+ } else {
+ int cLen = datasetColumnLength;
+ StringBuilder textBuilder = new StringBuilder(cLen + 2);
+ textBuilder.append('"').append(v);
+ int pad = cLen - (textBuilder.length() - 1);
+ for (int i = 0; i < pad; i++) {
+ textBuilder.append(' ');
+ }
+ textBuilder.append('"');
+ text = textBuilder.toString();
+ }
+ out.append(String.format(",\"%s\":%s", columnName, text));
+ }
+
+ private static void executeUpdateOrDdl(String statement) throws Exception {
+ LOGGER.debug("Executing: " + statement);
+ testExecutor.executeSqlppUpdateOrDdl(statement, TestCaseContext.OutputFormat.CLEAN_JSON);
+ }
+
+ private static Pair<ArrayNode, String> executeQuery(String query, boolean fetchPlan) throws Exception {
+ LOGGER.debug("Executing: " + query);
+
+ List<TestCase.CompilationUnit.Parameter> params;
+ if (fetchPlan) {
+ TestCase.CompilationUnit.Parameter planParameter = new TestCase.CompilationUnit.Parameter();
+ planParameter.setName(QueryServiceRequestParameters.Parameter.OPTIMIZED_LOGICAL_PLAN.str());
+ planParameter.setValue(Boolean.TRUE.toString());
+ planParameter.setType(ParameterTypeEnum.STRING);
+ params = Collections.singletonList(planParameter);
+ } else {
+ params = Collections.emptyList();
+ }
+
+ try (InputStream resultStream = testExecutor.executeQueryService(query, TestCaseContext.OutputFormat.CLEAN_JSON,
+ testExecutor.getEndpoint(Servlets.QUERY_SERVICE), params, true, StandardCharsets.UTF_8)) {
+ JsonNode r = OBJECT_READER.readTree(resultStream);
+ JsonNode errors = r.get("errors");
+ if (errors != null) {
+ Assert.fail("Query failed: " + errors);
+ }
+ JsonNode results = r.get("results");
+ if (!results.isArray()) {
+ Assert.fail("Expected array result, got: " + results);
+ }
+ ArrayNode resultsArray = (ArrayNode) results;
+ String plan = fetchPlan ? r.get("plans").get("optimizedLogicalPlan").asText() : null;
+ return new Pair<>(resultsArray, plan);
+ }
+ }
+
+ private static <T> T randomElement(List<T> list, RandomGenerator randomGenerator) {
+ return list.get(randomGenerator.nextInt(list.size()));
+ }
+
+ private static class TestInstance {
+
+ private final int id;
+
+ private final int c0;
+ private final int c1;
+ private final boolean c0nullable;
+ private final boolean c1nullable;
+ private final String col0;
+ private final String col1;
+ private final boolean outerJoin;
+ private final boolean broadcastJoin;
+
+ public TestInstance(int id, int c0, boolean c0nullable, int c1, boolean c1nullable, boolean outerJoin,
+ boolean broadcastJoin) {
+ this.id = id;
+ this.outerJoin = outerJoin;
+ this.c0 = c0;
+ this.c1 = c1;
+ this.c0nullable = c0nullable;
+ this.c1nullable = c1nullable;
+ this.broadcastJoin = broadcastJoin;
+ this.col0 = getColumnName(c0, c0nullable);
+ this.col1 = getColumnName(c1, c1nullable);
+ }
+
+ IntBuffer signature() {
+ return IntBuffer.wrap(
+ new int[] { c0, toInt(c0nullable), c1, toInt(c1nullable), toInt(outerJoin), toInt(broadcastJoin) });
+ }
+
+ void execute() throws Exception {
+ String query = createQuery();
+ Pair<ArrayNode, String> res = executeQuery(query, true);
+ String plan = res.second;
+ if (!plan.contains(PhysicalOperatorTag.HYBRID_HASH_JOIN.toString())) {
+ Assert.fail(PhysicalOperatorTag.HYBRID_HASH_JOIN + " operator was not used in query plan " + plan);
+ }
+ if (broadcastJoin && !plan.contains(PhysicalOperatorTag.BROADCAST_EXCHANGE.toString())) {
+ Assert.fail(PhysicalOperatorTag.BROADCAST_EXCHANGE + " operator was not used in query plan " + plan);
+ }
+ ArrayNode resultArray = res.first;
+
+ long expectedRowCount;
+ long expectedRowCountInnerJoin = Math.min(c0, c1);
+ if (outerJoin) {
+ expectedRowCount = expectedRowCountInnerJoin + (c0nullable ? 2 : 0) + Math.max(0, c0 - c1);
+ } else {
+ expectedRowCount = expectedRowCountInnerJoin;
+ }
+
+ long expectedAggCountInnerJoin = (datasetRowCount * datasetRowCount) / (((long) c0) * c1)
+ / (c0nullable ? 2 : 1) / (c1nullable ? 2 : 1);
+
+ int actualRowCount = resultArray.size();
+ if (actualRowCount != expectedRowCount) {
+ String commentHash = String.format("%s;%s", this, query);
+ File fHash = SqlppRQGTestBase.writeResult(OUTPUT_DIR, resultArray, id, "hash", commentHash);
+ Assert.fail(String.format("Unexpected row count %d for query #%d [%s]. Expected row count: %d %n %s ",
+ actualRowCount, id, this, expectedRowCount, fHash.getAbsolutePath()));
+ }
+
+ String col0Alias = String.format("%s_%s", DATASET_NAMES[0], col0);
+
+ for (int i = 0; i < actualRowCount; i++) {
+ JsonNode resultRecord = resultArray.get(i);
+ long actualAggCount = resultRecord.get("cnt").longValue();
+
+ long expectedAggCount;
+ if (outerJoin) {
+ JsonNode col0Node = resultRecord.get(col0Alias);
+ if (col0Node == null || col0Node.isNull()) {
+ expectedAggCount = datasetRowCount / 4;
+ } else {
+ if (getValueAsLong(col0Node) > c1) {
+ expectedAggCount = datasetRowCount / (c0nullable ? 2 : 1) / c0;
+ } else {
+ expectedAggCount = expectedAggCountInnerJoin;
+ }
+ }
+ } else {
+ expectedAggCount = expectedAggCountInnerJoin;
+ }
+
+ if (actualAggCount != expectedAggCount) {
+ String commentHash = String.format("%s;%s", this, query);
+ File fHash = SqlppRQGTestBase.writeResult(OUTPUT_DIR, resultArray, id, "hash", commentHash);
+ Assert.fail(String.format(
+ "Unexpected agg count %d in row %d for query #%d [%s]. Expected agg count: %d %n %s ",
+ actualAggCount, i, id, this, expectedAggCount, fHash.getAbsolutePath()));
+ }
+ }
+ }
+
+ private long getValueAsLong(JsonNode node) throws Exception {
+ String text = node.textValue().trim();
+ if (text.isEmpty()) {
+ throw new Exception("Unexpected result value: " + node);
+ }
+ try {
+ return Long.parseLong(text);
+ } catch (NumberFormatException e) {
+ throw new Exception("Unexpected result value: " + node);
+ }
+ }
+
+ String createQuery() {
+ return String.format(
+ "USE %s; SELECT t1.%s AS %s_%s, t2.%s AS %s_%s, count(*) AS cnt FROM %s t1 %s JOIN %s t2 ON t1.%s /*%s*/ = t2.%s /*%s*/ GROUP BY t1.%s, t2.%s ORDER BY t1.%s, t2.%s",
+ DATAVERSE_NAME, col0, DATASET_NAMES[0], col0, col1, DATASET_NAMES[1], col1, DATASET_NAMES[0],
+ getJoinKind(), DATASET_NAMES[1], col0, getJoinHint(), col1, getGroupByHint(), col0, col1, col0,
+ col1);
+ }
+
+ private String getJoinKind() {
+ return outerJoin ? "LEFT OUTER" : "INNER";
+ }
+
+ private String getJoinHint() {
+ return broadcastJoin ? "+" + SqlppHint.HASH_BROADCAST_JOIN_HINT.getIdentifier() : "";
+ }
+
+ private String getGroupByHint() {
+ return "";
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s ON %s=%s %s", getJoinKind(), col0, col1, getJoinHint());
+ }
+
+ static int toInt(boolean b) {
+ return b ? 1 : 0;
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
index 5201681..e97a389 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
@@ -127,7 +127,7 @@
@BeforeClass
public static void setUp() throws Exception {
testExecutor = new TestExecutor();
- LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME, testExecutor);
+ LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME, testExecutor, false);
StringBuilder sb = new StringBuilder(2048);
addDropDataverse(sb, DATAVERSE_NAME);
@@ -293,7 +293,7 @@
}
}
sb.append(") ");
- sb.append("OPEN TYPE PRIMARY KEY id;\n");
+ sb.append("OPEN TYPE PRIMARY KEY ").append(ID_COLUMN_NAME).append(";\n");
}
private static void addLoadDataset(StringBuilder sb, String dataverseName, String datasetName) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
index ec4b55f..dbf214b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
@@ -381,7 +381,7 @@
protected static void startAsterix() throws Exception {
testExecutor = new TestExecutor();
- LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor, false);
loadAsterixData();
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.1.query.sqlpp
new file mode 100644
index 0000000..337226d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.1.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test hash join when values on both side are MISSING
+ */
+
+with
+R as (
+ from range(1, 50000) r
+ select (case when get_year(current_date()) > 0 then missing else r end) as r
+),
+
+S as (
+ from range(1, 50000) s
+ select (case when get_year(current_date()) > 0 then missing else s end) as s
+)
+
+select count(*) cnt
+from R, S
+where R.r = S.s;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.2.query.sqlpp
new file mode 100644
index 0000000..94de090
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.2.query.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test hash join when values on both side are NULL
+ */
+
+with
+R as (
+ from range(1, 50000) r
+ select (case when get_year(current_date()) > 0 then null else r end) as r
+),
+
+S as (
+ from range(1, 50000) s
+ select (case when get_year(current_date()) > 0 then null else s end) as s
+)
+
+select count(*) cnt
+from R, S
+where R.r = S.s;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.1.adm
new file mode 100644
index 0000000..bacb60c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.1.adm
@@ -0,0 +1 @@
+{ "cnt": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.2.adm
new file mode 100644
index 0000000..bacb60c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.2.adm
@@ -0,0 +1 @@
+{ "cnt": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index d400c27..c1aed2a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6630,6 +6630,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="join">
+ <compilation-unit name="hash_join_missing">
+ <output-dir compare="Text">hash_join_missing</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="join">
<compilation-unit name="hash_join_record">
<output-dir compare="Text">hash_join_record</output-dir>
</compilation-unit>
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
index ab9f4e0..286d1cc 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
@@ -26,31 +26,24 @@
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
-/*
-Provides PredicateEvaluator for equi-join cases to properly take care of NULL fields, being compared with each other.
-If any of the join keys, from either side, is NULL, record should not pass equi-join condition.
-*/
+/**
+ * Provides PredicateEvaluator for equi-join cases to disqualify tuples having NULL/MISSING fields
+ * If any of the join keys, from either side, is NULL/MISSING, the tuple will not pass equi-join condition.
+ */
public class PredicateEvaluatorFactoryProvider implements IPredicateEvaluatorFactoryProvider {
private static final long serialVersionUID = 1L;
public static final PredicateEvaluatorFactoryProvider INSTANCE = new PredicateEvaluatorFactoryProvider();
@Override
- public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(final int[] keys0, final int[] keys1) {
+ public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(final int[] keys) {
return new IPredicateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
public IPredicateEvaluator createPredicateEvaluator() {
- return new IPredicateEvaluator() {
-
- @Override
- public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1,
- int tupId1) {
- return noNullOrMissingInKeys(fta0, tupId0, keys0) && noNullOrMissingInKeys(fta1, tupId1, keys1);
- }
- };
+ return (fta, tupId) -> noNullOrMissingInKeys(fta, tupId, keys);
}
};
}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 7e7d012..cc89deb 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -109,10 +109,11 @@
IBinaryHashFunctionFamily[] rightHashFunFamilies =
JobGenHelper.variablesToBinaryHashFunctionFamilies(keysRightBranch, env, context);
- IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
- context.getPredicateEvaluatorFactoryProvider();
- IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider == null ? null
- : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
+ IPredicateEvaluatorFactoryProvider predEvalFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+ IPredicateEvaluatorFactory leftPredEvalFactory =
+ predEvalFactoryProvider == null ? null : predEvalFactoryProvider.getPredicateEvaluatorFactory(keysLeft);
+ IPredicateEvaluatorFactory rightPredEvalFactory = predEvalFactoryProvider == null ? null
+ : predEvalFactoryProvider.getPredicateEvaluatorFactory(keysRight);
RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
@@ -131,7 +132,7 @@
opDesc = generateOptimizedHashJoinRuntime(context, joinOp, inputSchemas, keysLeft, keysRight,
leftHashFunFamilies, rightHashFunFamilies, comparatorFactory, reverseComparatorFactory,
- predEvaluatorFactory, recDescriptor, spec);
+ leftPredEvalFactory, rightPredEvalFactory, recDescriptor, spec);
opDesc.setSourceLocation(op.getSourceLocation());
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
@@ -145,20 +146,20 @@
AbstractBinaryJoinOperator joinOp, IOperatorSchema[] inputSchemas, int[] keysLeft, int[] keysRight,
IBinaryHashFunctionFamily[] leftHashFunFamilies, IBinaryHashFunctionFamily[] rightHashFunFamilies,
ITuplePairComparatorFactory comparatorFactory, ITuplePairComparatorFactory reverseComparatorFactory,
- IPredicateEvaluatorFactory predEvaluatorFactory, RecordDescriptor recDescriptor,
- IOperatorDescriptorRegistry spec) throws AlgebricksException {
+ IPredicateEvaluatorFactory leftPredEvalFactory, IPredicateEvaluatorFactory rightPredEvalFactory,
+ RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) throws AlgebricksException {
int memSizeInFrames = localMemoryRequirements.getMemoryBudgetInFrames();
switch (kind) {
case INNER:
return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies, recDescriptor,
- comparatorFactory, reverseComparatorFactory, predEvaluatorFactory);
+ comparatorFactory, reverseComparatorFactory, leftPredEvalFactory, rightPredEvalFactory);
case LEFT_OUTER:
IMissingWriterFactory[] nonMatchWriterFactories = JobGenHelper.createMissingWriterFactories(context,
((LeftOuterJoinOperator) joinOp).getMissingValue(), inputSchemas[1].getSize());
return new OptimizedHybridHashJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames,
getFudgeFactor(), keysLeft, keysRight, leftHashFunFamilies, rightHashFunFamilies, recDescriptor,
- comparatorFactory, reverseComparatorFactory, predEvaluatorFactory, true,
+ comparatorFactory, reverseComparatorFactory, leftPredEvalFactory, rightPredEvalFactory, true,
nonMatchWriterFactories);
default:
throw new NotImplementedException();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 84dda11..b96b887 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -44,8 +44,6 @@
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -93,11 +91,6 @@
IBinaryHashFunctionFactory[] rightHashFunFactories =
JobGenHelper.variablesToBinaryHashFunctionFactories(keysRightBranch, env, context);
- IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
- context.getPredicateEvaluatorFactoryProvider();
- IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider == null ? null
- : predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
-
RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
@@ -116,15 +109,14 @@
switch (kind) {
case INNER:
opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, leftHashFunFactories,
- rightHashFunFactories, comparatorFactory, recDescriptor, tableSize, predEvaluatorFactory,
- memSizeInFrames);
+ rightHashFunFactories, comparatorFactory, recDescriptor, tableSize, memSizeInFrames);
break;
case LEFT_OUTER:
IMissingWriterFactory[] nonMatchWriterFactories = JobGenHelper.createMissingWriterFactories(context,
((LeftOuterJoinOperator) joinOp).getMissingValue(), inputSchemas[1].getSize());
opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, leftHashFunFactories,
- rightHashFunFactories, comparatorFactory, predEvaluatorFactory, recDescriptor, true,
- nonMatchWriterFactories, tableSize, memSizeInFrames);
+ rightHashFunFactories, comparatorFactory, recDescriptor, true, nonMatchWriterFactories,
+ tableSize, memSizeInFrames);
break;
default:
throw new NotImplementedException();
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
index 5fbc1ab..20381f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
@@ -21,9 +21,6 @@
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-/*
- * Compares two tuples to make sure that records, whose comparison keys are NULL do not pass comparator filter
- */
public interface IPredicateEvaluator {
- public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1, int tupId1);
+ boolean evaluate(IFrameTupleAccessor fta, int tupId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
index eb94514..8748587 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
@@ -22,9 +22,8 @@
import java.io.Serializable;
/*
- * Provides PredicateEvaluator for equi-join related operators
+ * Provides PredicateEvaluator
*/
-
public interface IPredicateEvaluatorFactory extends Serializable {
- public IPredicateEvaluator createPredicateEvaluator();
+ IPredicateEvaluator createPredicateEvaluator();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
index 3eefc29..bc3e5c8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
@@ -24,7 +24,6 @@
/*
* Provides PredicateEvaluatorFactory based on (equi-join) keys
*/
-
public interface IPredicateEvaluatorFactoryProvider extends Serializable {
- public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys0, int[] keys1);
+ IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
index 398dee9..7cee242 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
@@ -28,16 +28,21 @@
private HybridHashJoinUtil() {
}
+ public enum SIDE {
+ BUILD,
+ PROBE
+ }
+
/**
* Prints out the detailed information for partitions: in-memory and spilled partitions.
* This method exists for a debug purpose.
*/
- public String printPartitionInfo(BitSet spilledStatus, OptimizedHybridHashJoin.SIDE whichSide, int numOfPartitions,
- int[] probePSizeInTups, int[] buildPSizeInTups, RunFileWriter[] probeRFWriters,
- RunFileWriter[] buildRFWriters, IPartitionedTupleBufferManager bufferManager) {
+ public String printPartitionInfo(BitSet spilledStatus, SIDE whichSide, int numOfPartitions, int[] probePSizeInTups,
+ int[] buildPSizeInTups, RunFileWriter[] probeRFWriters, RunFileWriter[] buildRFWriters,
+ IPartitionedTupleBufferManager bufferManager) {
StringBuilder buf = new StringBuilder();
buf.append(">>> " + this + " " + Thread.currentThread().getId() + " printInfo():" + "\n");
- if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+ if (whichSide == SIDE.BUILD) {
buf.append("BUILD side" + "\n");
} else {
buf.append("PROBE side" + "\n");
@@ -49,7 +54,7 @@
int spilledPartByteSize = 0;
for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid < numOfPartitions; pid =
spilledStatus.nextSetBit(pid + 1)) {
- if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+ if (whichSide == SIDE.BUILD) {
spilledTupleCount += buildPSizeInTups[pid];
spilledPartByteSize += buildRFWriters[pid].getFileSize();
buf.append("part:\t" + pid + "\t#tuple:\t" + buildPSizeInTups[pid] + "\tsize(MB):\t"
@@ -70,7 +75,7 @@
int inMemoryPartByteSize = 0;
for (int pid = spilledStatus.nextClearBit(0); pid >= 0 && pid < numOfPartitions; pid =
spilledStatus.nextClearBit(pid + 1)) {
- if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+ if (whichSide == SIDE.BUILD) {
inMemoryTupleCount += buildPSizeInTups[pid];
inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
} else {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index cb63b6a..4c2019e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -28,7 +28,6 @@
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -59,7 +58,6 @@
private final ISerializableTable table;
private final TuplePointer storedTuplePointer;
private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
- private final IPredicateEvaluator predEvaluator;
private final TupleInFrameListAccessor tupleAccessor;
// To release frames
private final ISimpleFrameBufferManager bufferManager;
@@ -70,17 +68,16 @@
public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
- ISerializableTable table, IPredicateEvaluator predEval, ISimpleFrameBufferManager bufferManager)
- throws HyracksDataException {
+ ISerializableTable table, ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild, isLeftOuter, missingWritersBuild, table,
- predEval, false, bufferManager);
+ false, bufferManager);
}
public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor accessorProbe,
ITuplePartitionComputer tpcProbe, FrameTupleAccessor accessorBuild, RecordDescriptor rDBuild,
ITuplePartitionComputer tpcBuild, boolean isLeftOuter, IMissingWriter[] missingWritersBuild,
- ISerializableTable table, IPredicateEvaluator predEval, boolean reverse,
- ISimpleFrameBufferManager bufferManager) throws HyracksDataException {
+ ISerializableTable table, boolean reverse, ISimpleFrameBufferManager bufferManager)
+ throws HyracksDataException {
this.table = table;
storedTuplePointer = new TuplePointer();
buffers = new ArrayList<>();
@@ -89,7 +86,6 @@
this.accessorProbe = accessorProbe;
this.tpcProbe = tpcProbe;
appender = new FrameTupleAppender(new VSizeFrame(ctx));
- predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
int fieldCountOuter = accessorBuild.getFieldCount();
@@ -178,11 +174,8 @@
accessorBuild.reset(buffers.get(bIndex));
int c = tpComparator.compare(accessorProbe, tid, accessorBuild, tIndex);
if (c == 0) {
- boolean predEval = evaluatePredicate(tid, tIndex);
- if (predEval) {
- matchFound = true;
- appendToResult(tid, tIndex, writer);
- }
+ matchFound = true;
+ appendToResult(tid, tIndex, writer);
}
}
}
@@ -228,14 +221,6 @@
table.close();
}
- private boolean evaluatePredicate(int tIx1, int tIx2) {
- if (reverseOutputOrder) { //Role Reversal Optimization is triggered
- return (predEvaluator == null) || predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1);
- } else {
- return (predEvaluator == null) || predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2);
- }
- }
-
private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException {
if (reverseOutputOrder) {
FrameUtils.appendConcatToWriter(writer, appender, accessorBuild, buildSidetIx, accessorProbe, probeSidetIx);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 33976a8..f89ccb0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -32,8 +32,6 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -63,7 +61,6 @@
private final IBinaryHashFunctionFactory[] hashFunctionFactories0;
private final IBinaryHashFunctionFactory[] hashFunctionFactories1;
private final ITuplePairComparatorFactory comparatorFactory;
- private final IPredicateEvaluatorFactory predEvaluatorFactory;
private final boolean isLeftOuter;
private final IMissingWriterFactory[] nonMatchWriterFactories;
private final int tableSize;
@@ -73,23 +70,21 @@
public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories0, IBinaryHashFunctionFactory[] hashFunctionFactories1,
ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int tableSize,
- IPredicateEvaluatorFactory predEvalFactory, int memSizeInFrames) {
- this(spec, keys0, keys1, hashFunctionFactories0, hashFunctionFactories1, comparatorFactory, predEvalFactory,
- recordDescriptor, false, null, tableSize, memSizeInFrames);
+ int memSizeInFrames) {
+ this(spec, keys0, keys1, hashFunctionFactories0, hashFunctionFactories1, comparatorFactory, recordDescriptor,
+ false, null, tableSize, memSizeInFrames);
}
public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories0, IBinaryHashFunctionFactory[] hashFunctionFactories1,
- ITuplePairComparatorFactory comparatorFactory, IPredicateEvaluatorFactory predEvalFactory,
- RecordDescriptor recordDescriptor, boolean isLeftOuter, IMissingWriterFactory[] missingWriterFactories1,
- int tableSize, int memSizeInFrames) {
+ ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ IMissingWriterFactory[] missingWriterFactories1, int tableSize, int memSizeInFrames) {
super(spec, 2, 1);
this.keys0 = keys0;
this.keys1 = keys1;
this.hashFunctionFactories0 = hashFunctionFactories0;
this.hashFunctionFactories1 = hashFunctionFactories1;
this.comparatorFactory = comparatorFactory;
- this.predEvaluatorFactory = predEvalFactory;
this.outRecDescs[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nonMatchWriterFactories = missingWriterFactories1;
@@ -159,8 +154,6 @@
} else {
nullWriters1 = null;
}
- final IPredicateEvaluator predEvaluator =
- (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
final int memSizeInBytes = memSizeInFrames * jobletCtx.getInitialFrameSize();
final IDeallocatableFramePool framePool = new DeallocatableFramePool(jobletCtx, memSizeInBytes);
@@ -178,8 +171,7 @@
state = new HashBuildTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
ISerializableTable table = new SerializableHashTable(tableSize, jobletCtx, bufferManager);
state.joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(rd0), hpc0,
- new FrameTupleAccessor(rd1), rd1, hpc1, isLeftOuter, nullWriters1, table, predEvaluator,
- bufferManager);
+ new FrameTupleAccessor(rd1), rd1, hpc1, isLeftOuter, nullWriters1, table, bufferManager);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 03ff72f..414ae25 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -27,7 +27,6 @@
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -63,22 +62,20 @@
private final RunFileWriter runFileWriter;
private final boolean isLeftOuter;
private final ArrayTupleBuilder missingTupleBuilder;
- private final IPredicateEvaluator predEvaluator;
- // Added for handling correct calling for predicate-evaluator upon recursive calls
+ // Added for handling correct calling of recursive calls
// (in OptimizedHybridHashJoin) that cause role-reversal
private final boolean isReversed;
private final BufferInfo tempInfo = new BufferInfo(null, -1, -1);
private final BitSet outerMatchLOJ;
public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
- FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator predEval, boolean isLeftOuter,
+ FrameTupleAccessor accessorInner, int memBudgetInFrames, boolean isLeftOuter,
IMissingWriter[] missingWriters) throws HyracksDataException {
- this(jobletContext, accessorOuter, accessorInner, memBudgetInFrames, predEval, isLeftOuter, missingWriters,
- false);
+ this(jobletContext, accessorOuter, accessorInner, memBudgetInFrames, isLeftOuter, missingWriters, false);
}
public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
- FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator predEval, boolean isLeftOuter,
+ FrameTupleAccessor accessorInner, int memBudgetInFrames, boolean isLeftOuter,
IMissingWriter[] missingWriters, boolean isReversed) throws HyracksDataException {
this.accessorInner = accessorInner;
this.accessorOuter = accessorOuter;
@@ -97,7 +94,6 @@
new VariableFramePool(jobletContext, outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory
.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, outerBufferMngrMemBudgetInFrames));
- this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
if (isReversed) {
@@ -202,8 +198,7 @@
boolean matchFound = false;
for (int j = 0; j < innerTupleCount; ++j) {
int c = tpComparator.compare(accessorOuter, i, accessorInner, j);
- boolean prdEval = evaluatePredicate(i, j);
- if (c == 0 && prdEval) {
+ if (c == 0) {
matchFound = true;
appendToResults(i, j, writer);
}
@@ -214,14 +209,6 @@
}
}
- private boolean evaluatePredicate(int tIx1, int tIx2) {
- if (isReversed) { //Role Reversal Optimization is triggered
- return ((predEvaluator == null) || predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1));
- } else {
- return ((predEvaluator == null) || predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2));
- }
- }
-
private void appendToResults(int outerTupleId, int innerTupleId, IFrameWriter writer) throws HyracksDataException {
if (isReversed) {
appendResultToFrame(accessorInner, innerTupleId, accessorOuter, outerTupleId, writer);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 1de8094..24e1b45 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -29,8 +29,6 @@
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -52,25 +50,16 @@
private static final long serialVersionUID = 1L;
private final ITuplePairComparatorFactory comparatorFactory;
private final int memSize;
- private final IPredicateEvaluatorFactory predEvaluatorFactory;
private final boolean isLeftOuter;
private final IMissingWriterFactory[] nullWriterFactories1;
public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
- this(spec, comparatorFactory, recordDescriptor, memSize, null, isLeftOuter, nullWriterFactories1);
- }
-
- public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
- ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
- IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter,
- IMissingWriterFactory[] nullWriterFactories1) {
super(spec, 2, 1);
this.comparatorFactory = comparatorFactory;
this.outRecDescs[0] = recordDescriptor;
this.memSize = memSize;
- this.predEvaluatorFactory = predEvalFactory;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
}
@@ -117,8 +106,6 @@
final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final IPredicateEvaluator predEvaluator =
- (predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null;
final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
@@ -134,8 +121,7 @@
public void open() throws HyracksDataException {
state = new JoinCacheTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition));
state.joiner = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(rd0),
- new FrameTupleAccessor(rd1), memSize, predEvaluator, isLeftOuter, nullWriters1);
-
+ new FrameTupleAccessor(rd1), memSize, isLeftOuter, nullWriters1);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index f5eb4f7..7a9bb25 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -59,11 +59,6 @@
// Used for special probe BigObject which can not be held into the Join memory
private FrameTupleAppender bigFrameAppender;
- public enum SIDE {
- BUILD,
- PROBE
- }
-
private final IHyracksJobletContext jobletCtx;
private final String buildRelName;
@@ -74,7 +69,8 @@
private final RecordDescriptor probeRd;
private final RunFileWriter[] buildRFWriters; //writing spilled build partitions
private final RunFileWriter[] probeRFWriters; //writing spilled probe partitions
- private final IPredicateEvaluator predEvaluator;
+ private final IPredicateEvaluator buildPredEval;
+ private final IPredicateEvaluator probePredEval;
private final boolean isLeftOuter;
private final IMissingWriter[] nonMatchWriters;
private final BitSet spilledStatus; //0=resident, 1=spilled
@@ -98,8 +94,8 @@
public OptimizedHybridHashJoin(IHyracksJobletContext jobletCtx, int memSizeInFrames, int numOfPartitions,
String probeRelName, String buildRelName, RecordDescriptor probeRd, RecordDescriptor buildRd,
- ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval,
- boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
+ ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator probePredEval,
+ IPredicateEvaluator buildPredEval, boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1) {
this.jobletCtx = jobletCtx;
this.memSizeInFrames = memSizeInFrames;
this.buildRd = buildRd;
@@ -113,8 +109,12 @@
this.probeRFWriters = new RunFileWriter[numOfPartitions];
this.accessorBuild = new FrameTupleAccessor(buildRd);
this.accessorProbe = new FrameTupleAccessor(probeRd);
- this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
+ if (isLeftOuter && probePredEval != null) {
+ throw new IllegalStateException();
+ }
+ this.buildPredEval = buildPredEval;
+ this.probePredEval = probePredEval;
this.isReversed = false;
this.spilledStatus = new BitSet(numOfPartitions);
this.nonMatchWriters = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
@@ -141,11 +141,12 @@
accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
- int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
- processTupleBuildPhase(i, pid);
- buildPSizeInTups[pid]++;
+ if (buildPredEval == null || buildPredEval.evaluate(accessorBuild, i)) {
+ int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
+ processTupleBuildPhase(i, pid);
+ buildPSizeInTups[pid]++;
+ }
}
-
}
private void processTupleBuildPhase(int tid, int pid) throws HyracksDataException {
@@ -217,8 +218,8 @@
ISerializableTable table = new SerializableHashTable(inMemTupCount, jobletCtx, bufferManagerForHashTable);
this.inMemJoiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRd), probeHpc,
- new FrameTupleAccessor(buildRd), buildRd, buildHpc, isLeftOuter, nonMatchWriters, table, predEvaluator,
- isReversed, bufferManagerForHashTable);
+ new FrameTupleAccessor(buildRd), buildRd, buildHpc, isLeftOuter, nonMatchWriters, table, isReversed,
+ bufferManagerForHashTable);
buildHashTable();
}
@@ -473,22 +474,28 @@
public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
accessorProbe.reset(buffer);
int tupleCount = accessorProbe.getTupleCount();
-
- if (isBuildRelAllInMemory()) {
- inMemJoiner.join(buffer, writer);
- return;
- }
inMemJoiner.resetAccessorProbe(accessorProbe);
- for (int i = 0; i < tupleCount; ++i) {
- int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
-
- if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
- if (spilledStatus.get(pid)) { //pid is Spilled
- processTupleProbePhase(i, pid);
- } else { //pid is Resident
+ if (isBuildRelAllInMemory()) {
+ for (int i = 0; i < tupleCount; ++i) {
+ // NOTE: probePredEval is guaranteed to be 'null' for outer join and in case of role reversal
+ if (probePredEval == null || probePredEval.evaluate(accessorProbe, i)) {
inMemJoiner.join(i, writer);
}
- probePSizeInTups[pid]++;
+ }
+ } else {
+ for (int i = 0; i < tupleCount; ++i) {
+ // NOTE: probePredEval is guaranteed to be 'null' for outer join and in case of role reversal
+ if (probePredEval == null || probePredEval.evaluate(accessorProbe, i)) {
+ int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
+ if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has potential match from previous phase
+ if (spilledStatus.get(pid)) { //pid is Spilled
+ processTupleProbePhase(i, pid);
+ } else { //pid is Resident
+ inMemJoiner.join(i, writer);
+ }
+ probePSizeInTups[pid]++;
+ }
+ }
}
}
}
@@ -600,7 +607,10 @@
return bufferManager.getPhysicalSize(pid);
}
- public void setIsReversed(boolean b) {
- this.isReversed = b;
+ public void setIsReversed(boolean reversed) {
+ if (reversed && (buildPredEval != null || probePredEval != null)) {
+ throw new IllegalStateException();
+ }
+ this.isReversed = reversed;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index dcccd61..555e8fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -131,7 +131,8 @@
private final IBinaryHashFunctionFamily[] buildHashFunctionFactories;
private final ITuplePairComparatorFactory tuplePairComparatorFactoryProbe2Build; //For HHJ & NLJ in probe
private final ITuplePairComparatorFactory tuplePairComparatorFactoryBuild2Probe; //For HHJ & NLJ in probe
- private final IPredicateEvaluatorFactory predEvaluatorFactory;
+ private final IPredicateEvaluatorFactory probePredEvalFactory;
+ private final IPredicateEvaluatorFactory buildPredEvalFactory;
private final boolean isLeftOuter;
private final IMissingWriterFactory[] nonMatchWriterFactories;
@@ -148,8 +149,9 @@
IBinaryHashFunctionFamily[] propHashFunctionFactories,
IBinaryHashFunctionFamily[] buildHashFunctionFactories, RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory01,
- ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory,
- boolean isLeftOuter, IMissingWriterFactory[] nonMatchWriterFactories) {
+ ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvalFactory0,
+ IPredicateEvaluatorFactory predEvalFactory1, boolean isLeftOuter,
+ IMissingWriterFactory[] nonMatchWriterFactories) {
super(spec, 2, 1);
this.memSizeInFrames = memSizeInFrames;
this.inputsize0 = inputsize0;
@@ -161,7 +163,8 @@
this.tuplePairComparatorFactoryProbe2Build = tupPaircomparatorFactory01;
this.tuplePairComparatorFactoryBuild2Probe = tupPaircomparatorFactory10;
outRecDescs[0] = recordDescriptor;
- this.predEvaluatorFactory = predEvaluatorFactory;
+ this.probePredEvalFactory = predEvalFactory0;
+ this.buildPredEvalFactory = predEvalFactory1;
this.isLeftOuter = isLeftOuter;
this.nonMatchWriterFactories = nonMatchWriterFactories;
}
@@ -171,10 +174,11 @@
IBinaryHashFunctionFamily[] propHashFunctionFactories,
IBinaryHashFunctionFamily[] buildHashFunctionFactories, RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory01,
- ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory) {
+ ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvalFactory0,
+ IPredicateEvaluatorFactory predEvalFactory1) {
this(spec, memSizeInFrames, inputsize0, factor, keys0, keys1, propHashFunctionFactories,
buildHashFunctionFactories, recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10,
- predEvaluatorFactory, false, null);
+ predEvalFactory0, predEvalFactory1, false, null);
}
@Override
@@ -268,8 +272,10 @@
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
- final IPredicateEvaluator predEvaluator =
- (predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+ final IPredicateEvaluator buildPredEval =
+ (buildPredEvalFactory == null ? null : buildPredEvalFactory.createPredicateEvaluator());
+ final IPredicateEvaluator probePredEval = (probePredEvalFactory == null || isLeftOuter ? null
+ : probePredEvalFactory.createPredicateEvaluator());
return new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
@@ -293,7 +299,7 @@
getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
state.hybridHJ = new OptimizedHybridHashJoin(ctx.getJobletContext(), state.memForJoin,
state.numOfPartitions, PROBE_REL, BUILD_REL, probeRd, buildRd, probeHpc, buildHpc,
- predEvaluator, isLeftOuter, nonMatchWriterFactories);
+ probePredEval, buildPredEval, isLeftOuter, nonMatchWriterFactories);
state.hybridHJ.initBuild();
if (LOGGER.isTraceEnabled()) {
@@ -366,8 +372,6 @@
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final ITuplePairComparator probComp = tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
final ITuplePairComparator buildComp = tuplePairComparatorFactoryBuild2Probe.createTuplePairComparator(ctx);
- final IPredicateEvaluator predEvaluator =
- predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator();
final IMissingWriter[] nonMatchWriter =
isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
@@ -593,7 +597,7 @@
OptimizedHybridHashJoin rHHj;
int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
rHHj = new OptimizedHybridHashJoin(jobletCtx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd,
- buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories); //checked-confirmed
+ buildRd, probeHpc, buildHpc, null, null, isLeftOuter, nonMatchWriterFactories); //checked-confirmed
rHHj.setIsReversed(isReversed);
try {
@@ -748,7 +752,7 @@
ISerializableTable table = new SerializableHashTable(tabSize, jobletCtx, bufferManager);
InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRDesc),
hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, isLeftOuter,
- nonMatchWriter, table, predEvaluator, isReversed, bufferManager);
+ nonMatchWriter, table, isReversed, bufferManager);
joiner.setComparator(comp);
try {
bReader.open();
@@ -805,8 +809,7 @@
boolean isReversed = outerRd == buildRd && innerRd == probeRd;
ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp;
NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
- new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter,
- isReversed);
+ new FrameTupleAccessor(innerRd), memorySize, isLeftOuter, nonMatchWriter, isReversed);
nlj.setComparator(nljComptorOuterInner);
IFrame cacheBuff = new VSizeFrame(jobletCtx);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
index 93739df..9215856 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
@@ -26,7 +26,6 @@
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -61,7 +60,6 @@
static RecordDescriptor buildRd;
static ITuplePartitionComputer probeHpc;
static ITuplePartitionComputer buildHpc;
- static IPredicateEvaluator predEval;
int memSizeInFrames = -1;
int numOfPartitions = -1;
boolean isLeftOuter = false;
@@ -151,7 +149,7 @@
private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame frame) throws HyracksDataException {
hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames, numOfPartitions, probeRelName, buildRelName, probeRd,
- buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
+ buildRd, probeHpc, buildHpc, null, null, isLeftOuter, null);
hhj.initBuild();
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index a8bb1d5..296b682 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -133,7 +133,7 @@
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 128,
- null, 128);
+ 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -176,12 +176,12 @@
new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
- OptimizedHybridHashJoinOperatorDescriptor join =
- new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20, 1.2, new int[] { 1 }, new int[] { 0 },
- new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
- new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
- custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
- new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, false, null);
+ OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20,
+ 1.2, new int[] { 1 }, new int[] { 0 },
+ new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
+ new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, custOrderJoinDesc,
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, null, false, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
@@ -234,8 +234,8 @@
new int[] { 1 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, custOrderJoinDesc,
- true, nonMatchWriterFactories, 128, 128);
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), custOrderJoinDesc, true,
+ nonMatchWriterFactories, 128, 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -288,7 +288,7 @@
new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, true,
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null, true,
nonMatchWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
@@ -343,7 +343,7 @@
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 128,
- null, 128);
+ 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -396,12 +396,12 @@
new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
- OptimizedHybridHashJoinOperatorDescriptor join =
- new OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20, 1.2, new int[] { 1 }, new int[] { 0 },
- new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
- new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
- custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
- new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, false, null);
+ OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20, 1.2,
+ new int[] { 1 }, new int[] { 0 },
+ new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
+ new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, custOrderJoinDesc,
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null, null, false, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
@@ -460,7 +460,7 @@
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 128,
- null, 128);
+ 128);
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
ResultSetId rsId = new ResultSetId(1);
@@ -524,7 +524,7 @@
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), custOrderJoinDesc, 128,
- null, 128);
+ 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -581,7 +581,7 @@
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
@@ -629,7 +629,7 @@
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
@@ -678,7 +678,7 @@
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
custOrderJoinDesc, new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 3bbeca8..a351c85 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -185,7 +185,7 @@
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- Common.custOrderJoinDesc, tableSize, null, memSize * frameSize);
+ Common.custOrderJoinDesc, tableSize, memSize * frameSize);
} else if ("hybrid".equalsIgnoreCase(algo)) {
join = new OptimizedHybridHashJoinOperatorDescriptor(spec, memSize, graceInputSize, graceFactor,
@@ -194,7 +194,7 @@
new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
Common.custOrderJoinDesc,
new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+ new JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null, null);
} else {
System.err.println("unknown algorithm:" + algo);