[ASTERIXDB-2857][RT] Incorrect result for nested loop outer join
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Fix incorrect number of unmatched tuples emitted
by nested loop implementation of left outer join
- Add RunFileWriter.eraseClosed() method
Change-Id: Ib56cf82fbe335a1d8f5d69caaf51e746db252202
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10785
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp
new file mode 100644
index 0000000..2304349
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test nested loop implementation of left outer join
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type tenkType as closed {
+ unique1 : integer,
+ unique2 : integer,
+ two : integer,
+ four : integer,
+ ten : integer,
+ twenty : integer,
+ hundred : integer,
+ thousand : integer,
+ twothousand : integer,
+ fivethous : integer,
+ tenthous : integer,
+ odd100 : integer,
+ even100 : integer,
+ stringu1 : string,
+ stringu2 : string,
+ string4 : string
+};
+
+create dataset tenk(tenkType) primary key unique2;
+
+SELECT
+ t0.unique1 AS t0_unique1,
+ t1.unique1 AS t1_unique1,
+ t2.unique1 AS t2_unique1
+FROM (
+ SELECT unique1, unique2 FROM tenk WHERE unique2 < 2
+) t0
+INNER JOIN (
+ SELECT unique1, unique2 FROM tenk WHERE unique2 < 4
+) t1 ON t0.unique2 = t1.unique2
+LEFT JOIN (
+ SELECT unique1, unique2 FROM tenk WHERE unique2 < 6
+) t2 ON t0.unique2 + t2.unique2 = 2 * t1.unique2
+ORDER BY t0_unique1, t1_unique1, t2_unique1;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan
new file mode 100644
index 0000000..393f1db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan
@@ -0,0 +1,39 @@
+-- DISTRIBUTE_RESULT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- SORT_MERGE_EXCHANGE [$$136(ASC), $$137(ASC), $#3(ASC) ] |PARTITIONED|
+ -- STABLE_SORT [$$136(ASC), $$137(ASC), $#3(ASC)] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- NESTED_LOOP |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- HYBRID_HASH_JOIN [$$127][$$128] |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH (test.tenk.tenk) |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH (test.tenk.tenk) |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- BTREE_SEARCH (test.tenk.tenk) |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp
new file mode 100644
index 0000000..87b5d75
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test nested loop implementation of left outer join
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type tenkType as closed {
+ unique1 : integer,
+ unique2 : integer,
+ two : integer,
+ four : integer,
+ ten : integer,
+ twenty : integer,
+ hundred : integer,
+ thousand : integer,
+ twothousand : integer,
+ fivethous : integer,
+ tenthous : integer,
+ odd100 : integer,
+ even100 : integer,
+ stringu1 : string,
+ stringu2 : string,
+ string4 : string
+};
+
+create dataset tenk(tenkType) primary key unique2;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp
new file mode 100644
index 0000000..2d7e768
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+load dataset tenk using localfs ((`path`=`asterix_nc1://data/tenk.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp
new file mode 100644
index 0000000..823a540
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test nested loop implementation of left outer join
+ */
+
+USE test;
+
+SELECT
+ t0.unique1 AS t0_unique1,
+ t1.unique1 AS t1_unique1,
+ t2.unique1 AS t2_unique1
+FROM (
+ SELECT unique1, unique2 FROM tenk WHERE unique2 < 2
+) t0
+INNER JOIN (
+ SELECT unique1, unique2 FROM tenk WHERE unique2 < 4
+) t1 ON t0.unique2 = t1.unique2
+LEFT JOIN (
+ SELECT unique1, unique2 FROM tenk WHERE unique2 < 6
+) t2 ON t0.unique2 + t2.unique2 = 2 * t1.unique2
+ORDER BY t0_unique1, t1_unique1, t2_unique1;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm
new file mode 100644
index 0000000..1a31db8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm
@@ -0,0 +1,2 @@
+{ "t0_unique1": 1891, "t1_unique1": 1891, "t2_unique1": 1891 }
+{ "t0_unique1": 8800, "t1_unique1": 8800, "t2_unique1": 8800 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 9dc621d..ce440c1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -12983,6 +12983,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="leftouterjoin">
+ <compilation-unit name="query-ASTERIXDB-2857">
+ <output-dir compare="Text">query-ASTERIXDB-2857</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="leftouterjoin">
<compilation-unit name="right_branch_opt_1">
<output-dir compare="Text">right_branch_opt_1</output-dir>
</compilation-unit>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index dabdd4f..c370b58 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -75,9 +75,15 @@
}
public void erase() throws HyracksDataException {
- close();
- file.delete();
+ try {
+ close();
+ } finally {
+ eraseClosed();
+ }
+ }
+ public void eraseClosed() {
+ file.delete();
// Make sure we never access the file if it is deleted.
file = null;
handle = null;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index a5ad500..03ff72f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -20,6 +20,7 @@
import java.io.DataOutput;
import java.nio.ByteBuffer;
+import java.util.BitSet;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -28,6 +29,7 @@
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -43,6 +45,14 @@
import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
public class NestedLoopJoin {
+ // Note: Min memory budget should be less than {@code AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN}
+ // Inner join: 1 frame for the outer input side, 1 frame for the inner input side, 1 frame for the output
+ private static final int MIN_FRAME_BUDGET_INNER_JOIN = 3;
+ // Outer join extra: Add 1 frame for the {@code outerMatchLOJ} bitset
+ private static final int MIN_FRAME_BUDGET_OUTER_JOIN = MIN_FRAME_BUDGET_INNER_JOIN + 1;
+ // Outer join needs 1 bit per each tuple in the outer side buffer
+ private static final int ESTIMATE_AVG_TUPLE_SIZE = 128;
+
private final FrameTupleAccessor accessorInner;
private final FrameTupleAccessor accessorOuter;
private final FrameTupleAppender appender;
@@ -54,30 +64,45 @@
private final boolean isLeftOuter;
private final ArrayTupleBuilder missingTupleBuilder;
private final IPredicateEvaluator predEvaluator;
- private boolean isReversed; //Added for handling correct calling for predicate-evaluator upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
+ // Added for handling correct calling for predicate-evaluator upon recursive calls
+ // (in OptimizedHybridHashJoin) that cause role-reversal
+ private final boolean isReversed;
private final BufferInfo tempInfo = new BufferInfo(null, -1, -1);
+ private final BitSet outerMatchLOJ;
public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
- FrameTupleAccessor accessorInner, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter,
+ FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator predEval, boolean isLeftOuter,
IMissingWriter[] missingWriters) throws HyracksDataException {
+ this(jobletContext, accessorOuter, accessorInner, memBudgetInFrames, predEval, isLeftOuter, missingWriters,
+ false);
+ }
+
+ public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
+ FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator predEval, boolean isLeftOuter,
+ IMissingWriter[] missingWriters, boolean isReversed) throws HyracksDataException {
this.accessorInner = accessorInner;
this.accessorOuter = accessorOuter;
this.appender = new FrameTupleAppender();
this.outBuffer = new VSizeFrame(jobletContext);
this.innerBuffer = new VSizeFrame(jobletContext);
this.appender.reset(outBuffer, true);
- if (memSize < 3) {
- throw new HyracksDataException("Not enough memory is available for Nested Loop Join");
+
+ int minMemBudgetInFrames = isLeftOuter ? MIN_FRAME_BUDGET_OUTER_JOIN : MIN_FRAME_BUDGET_INNER_JOIN;
+ if (memBudgetInFrames < minMemBudgetInFrames) {
+ throw new HyracksDataException(ErrorCode.INSUFFICIENT_MEMORY);
}
+ int outerBufferMngrMemBudgetInFrames = memBudgetInFrames - minMemBudgetInFrames + 1;
+ int outerBufferMngrMemBudgetInBytes = jobletContext.getInitialFrameSize() * outerBufferMngrMemBudgetInFrames;
this.outerBufferMngr = new VariableFrameMemoryManager(
- new VariableFramePool(jobletContext, jobletContext.getInitialFrameSize() * (memSize - 2)),
- FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, memSize - 2));
+ new VariableFramePool(jobletContext, outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory
+ .createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, outerBufferMngrMemBudgetInFrames));
this.predEvaluator = predEval;
- this.isReversed = false;
-
this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
+ if (isReversed) {
+ throw new HyracksDataException(ErrorCode.ILLEGAL_STATE, "Outer join cannot reverse roles");
+ }
int innerFieldCount = this.accessorInner.getFieldCount();
missingTupleBuilder = new ArrayTupleBuilder(innerFieldCount);
DataOutput out = missingTupleBuilder.getDataOutput();
@@ -85,9 +110,14 @@
missingWriters[i].writeMissing(out);
missingTupleBuilder.addFieldEndOffset();
}
+ // Outer join needs 1 bit per each tuple in the outer side buffer
+ int outerMatchLOJCardinalityEstimate = outerBufferMngrMemBudgetInBytes / ESTIMATE_AVG_TUPLE_SIZE;
+ outerMatchLOJ = new BitSet(Math.max(outerMatchLOJCardinalityEstimate, 1));
} else {
missingTupleBuilder = null;
+ outerMatchLOJ = null;
}
+ this.isReversed = isReversed;
FileReference file =
jobletContext.createManagedWorkspaceFile(this.getClass().getSimpleName() + this.toString());
@@ -117,23 +147,7 @@
return;
}
if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
- RunFileReader runFileReader = runFileWriter.createReader();
- try {
- runFileReader.open();
- if (runFileReader.nextFrame(innerBuffer)) {
- do {
- for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
- blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
- }
- } while (runFileReader.nextFrame(innerBuffer));
- } else if (isLeftOuter) {
- for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
- appendMissing(outerBufferMngr.getFrame(i, tempInfo), writer);
- }
- }
- } finally {
- runFileReader.close();
- }
+ multiBlockJoin(writer);
outerBufferMngr.reset();
if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
throw new HyracksDataException("The given outer frame of size:" + outerBuffer.capacity()
@@ -142,16 +156,51 @@
}
}
- private void blockJoin(BufferInfo outerBufferInfo, ByteBuffer innerBuffer, IFrameWriter writer)
- throws HyracksDataException {
- accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(), outerBufferInfo.getLength());
- accessorInner.reset(innerBuffer);
- int tupleCount0 = accessorOuter.getTupleCount();
- int tupleCount1 = accessorInner.getTupleCount();
+ private void multiBlockJoin(IFrameWriter writer) throws HyracksDataException {
+ int outerBufferFrameCount = outerBufferMngr.getNumFrames();
+ if (outerBufferFrameCount == 0) {
+ return;
+ }
+ RunFileReader runFileReader = runFileWriter.createReader();
+ try {
+ runFileReader.open();
+ if (isLeftOuter) {
+ outerMatchLOJ.clear();
+ }
+ while (runFileReader.nextFrame(innerBuffer)) {
+ int outerTupleRunningCount = 0;
+ for (int i = 0; i < outerBufferFrameCount; i++) {
+ BufferInfo outerBufferInfo = outerBufferMngr.getFrame(i, tempInfo);
+ accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(),
+ outerBufferInfo.getLength());
+ int outerTupleCount = accessorOuter.getTupleCount();
+ accessorInner.reset(innerBuffer.getBuffer());
+ blockJoin(outerTupleRunningCount, writer);
+ outerTupleRunningCount += outerTupleCount;
+ }
+ }
+ if (isLeftOuter) {
+ int outerTupleRunningCount = 0;
+ for (int i = 0; i < outerBufferFrameCount; i++) {
+ BufferInfo outerBufferInfo = outerBufferMngr.getFrame(i, tempInfo);
+ accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(),
+ outerBufferInfo.getLength());
+ int outerFrameTupleCount = accessorOuter.getTupleCount();
+ appendMissing(outerTupleRunningCount, outerFrameTupleCount, writer);
+ outerTupleRunningCount += outerFrameTupleCount;
+ }
+ }
+ } finally {
+ runFileReader.close();
+ }
+ }
- for (int i = 0; i < tupleCount0; ++i) {
+ private void blockJoin(int outerTupleStartPos, IFrameWriter writer) throws HyracksDataException {
+ int outerTupleCount = accessorOuter.getTupleCount();
+ int innerTupleCount = accessorInner.getTupleCount();
+ for (int i = 0; i < outerTupleCount; ++i) {
boolean matchFound = false;
- for (int j = 0; j < tupleCount1; ++j) {
+ for (int j = 0; j < innerTupleCount; ++j) {
int c = tpComparator.compare(accessorOuter, i, accessorInner, j);
boolean prdEval = evaluatePredicate(i, j);
if (c == 0 && prdEval) {
@@ -159,13 +208,8 @@
appendToResults(i, j, writer);
}
}
-
- if (!matchFound && isLeftOuter) {
- final int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets();
- final byte[] ntByteArray = missingTupleBuilder.getByteArray();
- final int ntSize = missingTupleBuilder.getSize();
- FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0,
- ntSize);
+ if (isLeftOuter && matchFound) {
+ outerMatchLOJ.set(outerTupleStartPos + i);
}
}
}
@@ -191,15 +235,18 @@
FrameUtils.appendConcatToWriter(writer, appender, accessor1, tupleId1, accessor2, tupleId2);
}
- private void appendMissing(BufferInfo outerBufferInfo, IFrameWriter writer) throws HyracksDataException {
- accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(), outerBufferInfo.getLength());
- int tupleCount = accessorOuter.getTupleCount();
- for (int i = 0; i < tupleCount; ++i) {
- final int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets();
- final byte[] ntByteArray = missingTupleBuilder.getByteArray();
- final int ntSize = missingTupleBuilder.getSize();
- FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, i, ntFieldEndOffsets, ntByteArray, 0,
- ntSize);
+ private void appendMissing(int outerFrameMngrStartPos, int outerFrameTupleCount, IFrameWriter writer)
+ throws HyracksDataException {
+ int limit = outerFrameMngrStartPos + outerFrameTupleCount;
+ for (int outerTuplePos =
+ outerMatchLOJ.nextClearBit(outerFrameMngrStartPos); outerTuplePos < limit; outerTuplePos =
+ outerMatchLOJ.nextClearBit(outerTuplePos + 1)) {
+ int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets();
+ byte[] ntByteArray = missingTupleBuilder.getByteArray();
+ int ntSize = missingTupleBuilder.getSize();
+ int outerAccessorTupleIndex = outerTuplePos - outerFrameMngrStartPos;
+ FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, outerAccessorTupleIndex, ntFieldEndOffsets,
+ ntByteArray, 0, ntSize);
}
}
@@ -210,22 +257,10 @@
}
public void completeJoin(IFrameWriter writer) throws HyracksDataException {
- RunFileReader runFileReader = runFileWriter.createDeleteOnCloseReader();
try {
- runFileReader.open();
- if (runFileReader.nextFrame(innerBuffer)) {
- do {
- for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
- blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(), writer);
- }
- } while (runFileReader.nextFrame(innerBuffer));
- } else if (isLeftOuter) {
- for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
- appendMissing(outerBufferMngr.getFrame(i, tempInfo), writer);
- }
- }
+ multiBlockJoin(writer);
} finally {
- runFileReader.close();
+ runFileWriter.eraseClosed();
}
appender.write(writer, true);
}
@@ -233,8 +268,4 @@
public void releaseMemory() throws HyracksDataException {
outerBufferMngr.reset();
}
-
- public void setIsReversed(boolean b) {
- this.isReversed = b;
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index c142113..bb79981 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -802,11 +802,10 @@
// The nested loop join result is outer + inner. All the other operator is probe + build.
// Hence the reverse relation is different.
boolean isReversed = outerRd == buildRd && innerRd == probeRd;
- assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp;
NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
- new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter);
- nlj.setIsReversed(isReversed);
+ new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter, nonMatchWriter,
+ isReversed);
nlj.setComparator(nljComptorOuterInner);
IFrame cacheBuff = new VSizeFrame(jobletCtx);