[NO ISSUE] Refactored Merge Join to use Cursor
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
-Adds A Cursor for ITuplePointerAccessor, and IFrameTupleAccessor
using an interface
-Minor refactoring
-Removes ITuple Accessor
Change-Id: I3134e77d521cf192e9948912720414f868cdc83b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7703
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/IntervalMergeJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/IntervalMergeJoiner.java
index d1d2310..c19ad06 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/IntervalMergeJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/IntervalMergeJoiner.java
@@ -19,16 +19,14 @@
package org.apache.asterix.runtime.operators.joins.interval;
import java.nio.ByteBuffer;
-import java.util.Iterator;
import java.util.LinkedList;
import org.apache.asterix.runtime.operators.joins.interval.utils.IIntervalJoinUtil;
-import org.apache.asterix.runtime.operators.joins.interval.utils.memory.ITupleAccessor;
+import org.apache.asterix.runtime.operators.joins.interval.utils.memory.FrameTupleCursor;
import org.apache.asterix.runtime.operators.joins.interval.utils.memory.IntervalSideTuple;
-import org.apache.asterix.runtime.operators.joins.interval.utils.memory.IntervalVariableDeletableTupleMemoryManager;
import org.apache.asterix.runtime.operators.joins.interval.utils.memory.RunFilePointer;
import org.apache.asterix.runtime.operators.joins.interval.utils.memory.RunFileStream;
-import org.apache.asterix.runtime.operators.joins.interval.utils.memory.TupleAccessor;
+import org.apache.asterix.runtime.operators.joins.interval.utils.memory.TuplePointerCursor;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -41,6 +39,7 @@
import org.apache.hyracks.dataflow.std.buffermanager.DeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.IDeallocatableFramePool;
import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
+import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
/**
@@ -52,22 +51,9 @@
*/
public class IntervalMergeJoiner {
- public enum TupleStatus {
- LOADED,
- EMPTY;
-
- public boolean isLoaded() {
- return this.equals(LOADED);
- }
-
- public boolean isEmpty() {
- return this.equals(EMPTY);
- }
- }
-
private final IDeallocatableFramePool framePool;
private final IDeletableTupleBufferManager bufferManager;
- private final ITupleAccessor memoryAccessor;
+ private final TuplePointerCursor memoryCursor;
private final LinkedList<TuplePointer> memoryBuffer = new LinkedList<>();
private final RunFileStream runFileStream;
@@ -84,7 +70,7 @@
protected final IFrame[] inputBuffer;
protected final FrameTupleAppender resultAppender;
- protected final ITupleAccessor[] inputAccessor;
+ protected final FrameTupleCursor[] inputCursor;
public IntervalMergeJoiner(IHyracksTaskContext ctx, int memorySize, IIntervalJoinUtil mjc, int buildKeys,
int probeKeys, RecordDescriptor buildRd, RecordDescriptor probeRd) throws HyracksDataException {
@@ -92,13 +78,13 @@
// Memory (probe buffer)
if (memorySize < 5) {
- throw new HyracksDataException(
- "MergeJoiner does not have enough memory (needs > 4, got " + memorySize + ").");
+ throw new RuntimeException(
+ "IntervalMergeJoiner does not have enough memory (needs > 4, got " + memorySize + ").");
}
- inputAccessor = new TupleAccessor[JOIN_PARTITIONS];
- inputAccessor[BUILD_PARTITION] = new TupleAccessor(buildRd);
- inputAccessor[PROBE_PARTITION] = new TupleAccessor(probeRd);
+ inputCursor = new FrameTupleCursor[JOIN_PARTITIONS];
+ inputCursor[BUILD_PARTITION] = new FrameTupleCursor(buildRd);
+ inputCursor[PROBE_PARTITION] = new FrameTupleCursor(probeRd);
inputBuffer = new IFrame[JOIN_PARTITIONS];
inputBuffer[BUILD_PARTITION] = new VSizeFrame(ctx);
@@ -106,164 +92,143 @@
//Two frames are used for the runfile stream, and one frame for each input (2 outputs).
framePool = new DeallocatableFramePool(ctx, (memorySize - 4) * ctx.getInitialFrameSize());
- bufferManager = new IntervalVariableDeletableTupleMemoryManager(framePool, probeRd);
- memoryAccessor = ((IntervalVariableDeletableTupleMemoryManager) bufferManager).createTupleAccessor();
+ bufferManager = new VariableDeletableTupleMemoryManager(framePool, probeRd);
+ memoryCursor = new TuplePointerCursor(bufferManager.createTuplePointerAccessor());
// Run File and frame cache (build buffer)
- runFileStream = new RunFileStream(ctx, "ismj-left");
+ runFileStream = new RunFileStream(ctx, "imj-build");
runFilePointer = new RunFilePointer();
runFileStream.createRunFileWriting();
runFileStream.startRunFileWriting();
- memoryTuple = new IntervalSideTuple(mjc, memoryAccessor, probeKeys);
+ memoryTuple = new IntervalSideTuple(mjc, memoryCursor, probeKeys);
inputTuple = new IntervalSideTuple[JOIN_PARTITIONS];
- inputTuple[PROBE_PARTITION] = new IntervalSideTuple(mjc, inputAccessor[PROBE_PARTITION], probeKeys);
- inputTuple[BUILD_PARTITION] = new IntervalSideTuple(mjc, inputAccessor[BUILD_PARTITION], buildKeys);
+ inputTuple[PROBE_PARTITION] = new IntervalSideTuple(mjc, inputCursor[PROBE_PARTITION], probeKeys);
+ inputTuple[BUILD_PARTITION] = new IntervalSideTuple(mjc, inputCursor[BUILD_PARTITION], buildKeys);
// Result
this.resultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
}
public void processBuildFrame(ByteBuffer buffer) throws HyracksDataException {
- inputAccessor[BUILD_PARTITION].reset(buffer);
- for (int x = 0; x < inputAccessor[BUILD_PARTITION].getTupleCount(); x++) {
- runFileStream.addToRunFile(inputAccessor[BUILD_PARTITION], x);
+ inputCursor[BUILD_PARTITION].reset(buffer);
+ for (int x = 0; x < inputCursor[BUILD_PARTITION].getAccessor().getTupleCount(); x++) {
+ runFileStream.addToRunFile(inputCursor[BUILD_PARTITION].getAccessor(), x);
}
}
public void processBuildClose() throws HyracksDataException {
runFileStream.flushRunFile();
- runFileStream.startReadingRunFile(inputAccessor[BUILD_PARTITION]);
+ runFileStream.startReadingRunFile(inputCursor[BUILD_PARTITION]);
}
public void processProbeFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
- inputAccessor[PROBE_PARTITION].reset(buffer);
- inputAccessor[PROBE_PARTITION].next();
-
- TupleStatus buildTs = loadBuildTuple();
- TupleStatus probeTs = loadProbeTuple();
- while (buildTs.isLoaded() && probeTs.isLoaded()) {
- if (probeTs.isLoaded() && mjc.checkToLoadNextProbeTuple(inputAccessor[BUILD_PARTITION],
- inputAccessor[BUILD_PARTITION].getTupleId(), inputAccessor[PROBE_PARTITION],
- inputAccessor[PROBE_PARTITION].getTupleId())) {
- // Right side from stream
+ inputCursor[PROBE_PARTITION].reset(buffer);
+ while (buildHasNext() && inputCursor[PROBE_PARTITION].hasNext()) {
+ if (inputCursor[PROBE_PARTITION].hasNext() && mjc.checkToLoadNextProbeTuple(
+ inputCursor[BUILD_PARTITION].getAccessor(), inputCursor[BUILD_PARTITION].getTupleId() + 1,
+ inputCursor[PROBE_PARTITION].getAccessor(), inputCursor[PROBE_PARTITION].getTupleId() + 1)) {
+ // Process probe side from stream
+ inputCursor[PROBE_PARTITION].next();
processProbeTuple(writer);
- probeTs = loadProbeTuple();
} else {
- // Left side from stream
+ // Process build side from runfile
+ inputCursor[BUILD_PARTITION].next();
processBuildTuple(writer);
- buildTs = loadBuildTuple();
}
}
}
public void processProbeClose(IFrameWriter writer) throws HyracksDataException {
-
- TupleStatus buildTs = loadBuildTuple();
- while (buildTs.isLoaded() && memoryHasTuples()) {
- // Left side from stream
+ while (buildHasNext() && memoryHasTuples()) {
+ // Process build side from runfile
+ inputCursor[BUILD_PARTITION].next();
processBuildTuple(writer);
- buildTs = loadBuildTuple();
}
-
resultAppender.write(writer, true);
runFileStream.close();
runFileStream.removeRunFile();
}
- private TupleStatus loadProbeTuple() {
- TupleStatus loaded;
- if (inputAccessor[PROBE_PARTITION] != null && inputAccessor[PROBE_PARTITION].exists()) {
- // Still processing frame.
- loaded = TupleStatus.LOADED;
+ private boolean buildHasNext() throws HyracksDataException {
+ if (!inputCursor[BUILD_PARTITION].hasNext()) {
+ // Must keep condition in a separate `if` due to actions applied in loadNextBuffer.
+ return runFileStream.loadNextBuffer(inputCursor[BUILD_PARTITION]);
} else {
- // No more frames or tuples to process.
- loaded = TupleStatus.EMPTY;
+ return true;
}
- return loaded;
- }
-
- private TupleStatus loadBuildTuple() throws HyracksDataException {
- if (!inputAccessor[BUILD_PARTITION].exists()) {
- // Must keep condition in a separate if due to actions applied in loadNextBuffer.
- if (!runFileStream.loadNextBuffer(inputAccessor[BUILD_PARTITION])) {
- return TupleStatus.EMPTY;
- }
- }
- return TupleStatus.LOADED;
}
private void processBuildTuple(IFrameWriter writer) throws HyracksDataException {
// Check against memory
if (memoryHasTuples()) {
inputTuple[BUILD_PARTITION].loadTuple();
- Iterator<TuplePointer> memoryIterator = memoryBuffer.iterator();
- while (memoryIterator.hasNext()) {
- TuplePointer tp = memoryIterator.next();
- memoryTuple.setTuple(tp);
+ memoryCursor.reset(memoryBuffer.iterator());
+ while (memoryCursor.hasNext()) {
+ memoryCursor.next();
+ memoryTuple.loadTuple();
if (inputTuple[BUILD_PARTITION].removeFromMemory(memoryTuple)) {
// remove from memory
- bufferManager.deleteTuple(tp);
- memoryIterator.remove();
+ bufferManager.deleteTuple(memoryCursor.getTuplePointer());
+ memoryCursor.remove();
continue;
} else if (inputTuple[BUILD_PARTITION].checkForEarlyExit(memoryTuple)) {
// No more possible comparisons
break;
} else if (inputTuple[BUILD_PARTITION].compareJoin(memoryTuple)) {
// add to result
- addToResult(inputAccessor[BUILD_PARTITION], inputAccessor[BUILD_PARTITION].getTupleId(),
- memoryAccessor, tp.getTupleIndex(), writer);
+ addToResult(inputCursor[BUILD_PARTITION].getAccessor(), inputCursor[BUILD_PARTITION].getTupleId(),
+ memoryCursor.getAccessor(), memoryCursor.getTupleId(), writer);
}
}
}
- inputAccessor[BUILD_PARTITION].next();
}
private void processProbeTuple(IFrameWriter writer) throws HyracksDataException {
// append to memory
- if (mjc.checkToSaveInMemory(inputAccessor[BUILD_PARTITION], inputAccessor[BUILD_PARTITION].getTupleId(),
- inputAccessor[PROBE_PARTITION], inputAccessor[PROBE_PARTITION].getTupleId())) {
- if (!addToMemory(inputAccessor[PROBE_PARTITION])) {
- unfreezeAndClearMemory(writer, inputAccessor[BUILD_PARTITION]);
- return;
+ // BUILD Cursor is guaranteed to have next
+ if (mjc.checkToSaveInMemory(inputCursor[BUILD_PARTITION].getAccessor(),
+ inputCursor[BUILD_PARTITION].getTupleId() + 1, inputCursor[PROBE_PARTITION].getAccessor(),
+ inputCursor[PROBE_PARTITION].getTupleId())) {
+ if (!addToMemory(inputCursor[PROBE_PARTITION].getAccessor(), inputCursor[PROBE_PARTITION].getTupleId())) {
+ unfreezeAndClearMemory(writer);
+ if (!addToMemory(inputCursor[PROBE_PARTITION].getAccessor(),
+ inputCursor[PROBE_PARTITION].getTupleId())) {
+ throw new RuntimeException("Should Never get called.");
+ }
}
}
- inputAccessor[PROBE_PARTITION].next();
}
- private void unfreezeAndClearMemory(IFrameWriter writer, ITupleAccessor accessor) throws HyracksDataException {
- runFilePointer.reset(runFileStream.getReadPointer(), inputAccessor[BUILD_PARTITION].getTupleId());
- TupleStatus buildTs = loadBuildTuple();
- while (buildTs.isLoaded() && memoryHasTuples()) {
- // Left side from stream
+ private void unfreezeAndClearMemory(IFrameWriter writer) throws HyracksDataException {
+ runFilePointer.reset(runFileStream.getReadPointer(), inputCursor[BUILD_PARTITION].getTupleId());
+ while (buildHasNext() && memoryHasTuples()) {
+ // Process build side from runfile
+ inputCursor[BUILD_PARTITION].next();
processBuildTuple(writer);
- buildTs = loadBuildTuple();
}
- // Finish writing
- runFileStream.flushRunFile();
// Clear memory
memoryBuffer.clear();
bufferManager.reset();
// Start reading
- runFileStream.startReadingRunFile(accessor, runFilePointer.getFileOffset());
- accessor.setTupleId(runFilePointer.getTupleIndex());
- runFilePointer.reset(-1, -1);
+ runFileStream.startReadingRunFile(inputCursor[BUILD_PARTITION], runFilePointer.getFileOffset());
+ inputCursor[BUILD_PARTITION].resetPosition(runFilePointer.getTupleIndex());
}
- private boolean addToMemory(ITupleAccessor accessor) throws HyracksDataException {
+ private boolean addToMemory(IFrameTupleAccessor accessor, int tupleId) throws HyracksDataException {
TuplePointer tp = new TuplePointer();
- if (bufferManager.insertTuple(accessor, accessor.getTupleId(), tp)) {
+ if (bufferManager.insertTuple(accessor, tupleId, tp)) {
memoryBuffer.add(tp);
return true;
}
return false;
}
- private void addToResult(IFrameTupleAccessor accessorLeft, int leftTupleIndex, IFrameTupleAccessor accessorRight,
- int rightTupleIndex, IFrameWriter writer) throws HyracksDataException {
- FrameUtils.appendConcatToWriter(writer, resultAppender, accessorLeft, leftTupleIndex, accessorRight,
- rightTupleIndex);
+ private void addToResult(IFrameTupleAccessor buildAccessor, int buildTupleId, IFrameTupleAccessor probeAccessor,
+ int probeTupleId, IFrameWriter writer) throws HyracksDataException {
+ FrameUtils.appendConcatToWriter(writer, resultAppender, buildAccessor, buildTupleId, probeAccessor,
+ probeTupleId);
}
private boolean memoryHasTuples() {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleAccessor.java
deleted file mode 100644
index 397f959..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleAccessor.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public abstract class AbstractTupleAccessor implements ITupleAccessor {
- public static final int UNSET = -2;
- public static final int INITIALIZED = -1;
-
- protected int tupleId = UNSET;
-
- protected int frameId;
-
- protected abstract IFrameTupleAccessor getInnerAccessor();
-
- protected abstract void resetInnerAccessor(int frameId);
-
- protected abstract void resetInnerAccessor(TuplePointer tp);
-
- protected abstract int getFrameCount();
-
- @Override
- public int getTupleStartOffset() {
- return getTupleStartOffset(tupleId);
- }
-
- @Override
- public int getTupleLength() {
- return getTupleLength(tupleId);
- }
-
- @Override
- public int getAbsFieldStartOffset(int fieldId) {
- return getAbsoluteFieldStartOffset(tupleId, fieldId);
- }
-
- @Override
- public int getFieldLength(int fieldId) {
- return getFieldLength(tupleId, fieldId);
- }
-
- @Override
- public ByteBuffer getBuffer() {
- return getInnerAccessor().getBuffer();
- }
-
- @Override
- public int getFieldCount() {
- return getInnerAccessor().getFieldCount();
- }
-
- @Override
- public int getFieldSlotsLength() {
- return getInnerAccessor().getFieldSlotsLength();
- }
-
- @Override
- public int getFieldEndOffset(int tupleIndex, int fIdx) {
- return getInnerAccessor().getFieldEndOffset(tupleId, fIdx);
- }
-
- @Override
- public int getFieldStartOffset(int tupleIndex, int fIdx) {
- return getInnerAccessor().getFieldStartOffset(tupleIndex, fIdx);
- }
-
- @Override
- public int getFieldLength(int tupleIndex, int fIdx) {
- return getInnerAccessor().getFieldLength(tupleIndex, fIdx);
- }
-
- @Override
- public int getTupleLength(int tupleIndex) {
- return getInnerAccessor().getTupleLength(tupleIndex);
- }
-
- @Override
- public int getTupleEndOffset(int tupleIndex) {
- return getInnerAccessor().getTupleEndOffset(tupleIndex);
- }
-
- @Override
- public int getTupleStartOffset(int tupleIndex) {
- return getInnerAccessor().getTupleStartOffset(tupleIndex);
- }
-
- @Override
- public int getAbsoluteFieldStartOffset(int tupleIndex, int fIdx) {
- return getInnerAccessor().getAbsoluteFieldStartOffset(tupleIndex, fIdx);
- }
-
- @Override
- public int getTupleCount() {
- return getInnerAccessor().getTupleCount();
- }
-
- @Override
- public void reset(TuplePointer tuplePointer) {
- resetInnerAccessor(tuplePointer.getFrameIndex());
- }
-
- @Override
- public void reset(ByteBuffer buffer) {
- throw new IllegalAccessError("Should never call this reset");
- }
-
- @Override
- public int getTupleEndOffset() {
- return getInnerAccessor().getTupleEndOffset(tupleId);
- }
-
- @Override
- public int getFieldEndOffset(int fieldId) {
- return getInnerAccessor().getFieldEndOffset(tupleId, fieldId);
- }
-
- @Override
- public int getFieldStartOffset(int fieldId) {
- return getInnerAccessor().getFieldStartOffset(tupleId, fieldId);
- }
-
- @Override
- public void getTuplePointer(TuplePointer tp) {
- tp.reset(frameId, tupleId);
- }
-
- @Override
- public int getTupleId() {
- return tupleId;
- }
-
- @Override
- public void setTupleId(int tupleId) {
- this.tupleId = tupleId;
- }
-
- @Override
- public void reset() {
- tupleId = INITIALIZED;
- frameId = 0;
- resetInnerAccessor(frameId);
- }
-
- @Override
- public boolean hasNext() {
- if (tupleId + 1 < getTupleCount() || frameId + 1 < getFrameCount()) {
- return true;
- }
- return false;
- }
-
- @Override
- public boolean exists() {
- return INITIALIZED < tupleId && getTupleEndOffset(tupleId) > 0 && tupleId < getTupleCount()
- && frameId < getFrameCount();
- }
-
- @Override
- public void next() {
- // TODO Consider error messages
- if (tupleId + 1 < getTupleCount()) {
- ++tupleId;
- } else if (frameId + 1 < getFrameCount()) {
- ++frameId;
- resetInnerAccessor(frameId);
- tupleId = 0;
- } else {
- // Force exists to fail, by incrementing the tuple pointer.
- ++tupleId;
- }
- }
-
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleCursor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleCursor.java
new file mode 100644
index 0000000..5734dec
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/AbstractTupleCursor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+
+public abstract class AbstractTupleCursor<T> implements ITupleCursor<T> {
+
+ public static final int UNSET = -2;
+ public static final int INITIALIZED = -1;
+ public int tupleId = UNSET;
+ protected IFrameTupleAccessor accessor;
+
+ @Override
+ public void next() {
+ ++tupleId;
+ }
+
+ @Override
+ public IFrameTupleAccessor getAccessor() {
+ return accessor;
+ }
+
+ @Override
+ public int getTupleId() {
+ return tupleId;
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/FrameTupleCursor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/FrameTupleCursor.java
new file mode 100644
index 0000000..b6c9b2e
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/FrameTupleCursor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class FrameTupleCursor extends AbstractTupleCursor<ByteBuffer> {
+
+ public FrameTupleCursor(RecordDescriptor recordDescriptor) {
+ accessor = new FrameTupleAccessor(recordDescriptor);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return INITIALIZED < (tupleId + 1) && (tupleId + 1) < accessor.getTupleCount();
+ }
+
+ @Override
+ public void reset(ByteBuffer byteBuffer) {
+ accessor.reset(byteBuffer);
+ tupleId = INITIALIZED;
+ }
+
+ public void resetPosition(int tupleId) {
+ this.tupleId = tupleId;
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java
deleted file mode 100644
index 77059d6..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleAccessor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public interface ITupleAccessor extends IFrameTupleAccessor {
- int getTupleStartOffset();
-
- int getTupleEndOffset();
-
- int getTupleLength();
-
- int getAbsFieldStartOffset(int fieldId);
-
- int getFieldLength(int fieldId);
-
- @Override
- int getFieldCount();
-
- @Override
- int getFieldSlotsLength();
-
- int getFieldEndOffset(int fieldId);
-
- int getFieldStartOffset(int fieldId);
-
- void reset(TuplePointer tuplePointer);
-
- @Override
- void reset(ByteBuffer buffer);
-
- int getTupleId();
-
- void setTupleId(int tupleId);
-
- void getTuplePointer(TuplePointer tp);
-
- /**
- * Only reset the iterator.
- */
- void reset();
-
- boolean hasNext();
-
- void next();
-
- boolean exists();
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleCursor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleCursor.java
new file mode 100644
index 0000000..8572ceb
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/ITupleCursor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+
+/**
+ * Represents an index cursor. The expected use
+ * cursor = new cursor();
+ * while(predicate){
+ * -cursor.reset()
+ * -while (cursor.hasNext()){
+ * --cursor.next()
+ * -}
+ * }
+ */
+public interface ITupleCursor<T> {
+
+ /**
+ * Checks if the Current Tuple Index Exists
+ *
+ * @return
+ */
+ boolean hasNext();
+
+ /**
+ * Increments the Tuple Index
+ *
+ */
+ void next();
+
+ /**
+ * Used in FrameTupleCursor to reset the accessor to the buffer
+ *
+ * @param param
+ */
+ void reset(T param);
+
+ /**
+ * Return the accessor
+ *
+ * @return
+ */
+ IFrameTupleAccessor getAccessor();
+
+ /**
+ * Return the tuple id.
+ *
+ * @return
+ */
+ int getTupleId();
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalJoinUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalJoinUtil.java
index 7ff5816..8a065f0 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalJoinUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalJoinUtil.java
@@ -21,22 +21,12 @@
import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
import org.apache.asterix.om.pointables.nonvisitor.AIntervalPointable;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
public class IntervalJoinUtil {
private IntervalJoinUtil() {
}
- public static void getIntervalPointable(ITupleAccessor accessor, int fieldId, TaggedValuePointable tvp,
- AIntervalPointable ip) {
- int start =
- accessor.getTupleStartOffset() + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(fieldId);
- int length = accessor.getFieldLength(fieldId);
- tvp.set(accessor.getBuffer().array(), start, length);
- tvp.getValue(ip);
- }
-
public static void getIntervalPointable(IFrameTupleAccessor accessor, int tupleId, int fieldId,
AIntervalPointable ip) {
int start = getIntervalOffset(accessor, tupleId, fieldId);
@@ -65,18 +55,4 @@
long intervalEnd = AIntervalSerializerDeserializer.getIntervalEnd(accessor.getBuffer().array(), start);
return intervalEnd;
}
-
- public static long getIntervalStart(ITupleAccessor accessor, int fieldId) {
- int start = accessor.getTupleStartOffset() + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(fieldId) + 1;
- long intervalStart = AIntervalSerializerDeserializer.getIntervalStart(accessor.getBuffer().array(), start);
- return intervalStart;
- }
-
- public static long getIntervalEnd(ITupleAccessor accessor, int fieldId) {
- int start = accessor.getTupleStartOffset() + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(fieldId) + 1;
- long intervalEnd = AIntervalSerializerDeserializer.getIntervalEnd(accessor.getBuffer().array(), start);
- return intervalEnd;
- }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalSideTuple.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalSideTuple.java
index 34c1e0e..ac30067 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalSideTuple.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalSideTuple.java
@@ -22,14 +22,11 @@
import org.apache.asterix.dataflow.data.nontagged.serde.AIntervalSerializerDeserializer;
import org.apache.asterix.runtime.operators.joins.interval.utils.IIntervalJoinUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
public class IntervalSideTuple {
// Tuple access
int fieldId;
- ITupleAccessor accessor;
- int tupleIndex;
- int frameIndex = -1;
+ ITupleCursor cursor;
long start;
long end;
@@ -37,36 +34,24 @@
// Join details
final IIntervalJoinUtil imjc;
- public IntervalSideTuple(IIntervalJoinUtil imjc, ITupleAccessor accessor, int fieldId) {
+ public IntervalSideTuple(IIntervalJoinUtil imjc, ITupleCursor cursor, int fieldId) {
this.imjc = imjc;
- this.accessor = accessor;
+ this.cursor = cursor;
this.fieldId = fieldId;
}
- public void setTuple(TuplePointer tp) {
- if (frameIndex != tp.getFrameIndex()) {
- accessor.reset(tp);
- frameIndex = tp.getFrameIndex();
- }
- tupleIndex = tp.getTupleIndex();
- int offset = IntervalJoinUtil.getIntervalOffset(accessor, tupleIndex, fieldId);
- start = AIntervalSerializerDeserializer.getIntervalStart(accessor.getBuffer().array(), offset);
- end = AIntervalSerializerDeserializer.getIntervalEnd(accessor.getBuffer().array(), offset);
- }
-
public void loadTuple() {
- tupleIndex = accessor.getTupleId();
- int offset = IntervalJoinUtil.getIntervalOffset(accessor, tupleIndex, fieldId);
- start = AIntervalSerializerDeserializer.getIntervalStart(accessor.getBuffer().array(), offset);
- end = AIntervalSerializerDeserializer.getIntervalEnd(accessor.getBuffer().array(), offset);
+ int offset = IntervalJoinUtil.getIntervalOffset(cursor.getAccessor(), cursor.getTupleId(), fieldId);
+ start = AIntervalSerializerDeserializer.getIntervalStart(cursor.getAccessor().getBuffer().array(), offset);
+ end = AIntervalSerializerDeserializer.getIntervalEnd(cursor.getAccessor().getBuffer().array(), offset);
}
public int getTupleIndex() {
- return tupleIndex;
+ return cursor.getTupleId();
}
- public ITupleAccessor getAccessor() {
- return accessor;
+ public ITupleCursor getCursor() {
+ return cursor;
}
public long getStart() {
@@ -78,14 +63,17 @@
}
public boolean compareJoin(IntervalSideTuple ist) throws HyracksDataException {
- return imjc.checkToSaveInResult(accessor, tupleIndex, ist.accessor, ist.tupleIndex);
+ return imjc.checkToSaveInResult(cursor.getAccessor(), cursor.getTupleId(), ist.cursor.getAccessor(),
+ ist.cursor.getTupleId());
}
public boolean removeFromMemory(IntervalSideTuple ist) {
- return imjc.checkToRemoveInMemory(accessor, tupleIndex, ist.accessor, ist.tupleIndex);
+ return imjc.checkToRemoveInMemory(cursor.getAccessor(), cursor.getTupleId(), ist.cursor.getAccessor(),
+ ist.cursor.getTupleId());
}
public boolean checkForEarlyExit(IntervalSideTuple ist) {
- return imjc.checkForEarlyExit(accessor, tupleIndex, ist.accessor, ist.tupleIndex);
+ return imjc.checkForEarlyExit(cursor.getAccessor(), cursor.getTupleId(), ist.cursor.getAccessor(),
+ ist.cursor.getTupleId());
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalVariableDeletableTupleMemoryManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalVariableDeletableTupleMemoryManager.java
deleted file mode 100644
index aebb932..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/IntervalVariableDeletableTupleMemoryManager.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
-
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.std.buffermanager.IFramePool;
-import org.apache.hyracks.dataflow.std.buffermanager.VariableDeletableTupleMemoryManager;
-import org.apache.hyracks.dataflow.std.sort.util.DeletableFrameTupleAppender;
-import org.apache.hyracks.dataflow.std.sort.util.IAppendDeletableFrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public class IntervalVariableDeletableTupleMemoryManager extends VariableDeletableTupleMemoryManager {
- public IntervalVariableDeletableTupleMemoryManager(IFramePool framePool, RecordDescriptor recordDescriptor) {
- super(framePool, recordDescriptor);
- }
-
- public ITupleAccessor createTupleAccessor() {
- return new AbstractTupleAccessor() {
- private IAppendDeletableFrameTupleAccessor bufferAccessor =
- new DeletableFrameTupleAppender(recordDescriptor);
-
- @Override
- protected IFrameTupleAccessor getInnerAccessor() {
- return bufferAccessor;
- }
-
- protected void resetInnerAccessor(TuplePointer tuplePointer) {
- bufferAccessor.reset(frames.get(tuplePointer.getFrameIndex()));
- }
-
- @Override
- protected void resetInnerAccessor(int frameIndex) {
- bufferAccessor.reset(frames.get(frameIndex));
- }
-
- @Override
- protected int getFrameCount() {
- return frames.size();
- }
-
- @Override
- public boolean hasNext() {
- return hasNext(frameId, tupleId);
- }
-
- @Override
- public void next() {
- tupleId = nextTuple(frameId, tupleId);
- if (tupleId > INITIALIZED) {
- return;
- }
-
- if (frameId + 1 < getFrameCount()) {
- ++frameId;
- resetInnerAccessor(frameId);
- tupleId = INITIALIZED;
- next();
- }
- }
-
- public boolean hasNext(int fId, int tId) {
- int id = nextTuple(fId, tId);
- if (id > INITIALIZED) {
- return true;
- }
- if (fId + 1 < getFrameCount()) {
- return hasNext(fId + 1, INITIALIZED);
- }
- return false;
- }
-
- public int nextTuple(int fId, int tId) {
- if (fId != frameId) {
- resetInnerAccessor(fId);
- }
- int id = nextTupleInFrame(tId);
- if (fId != frameId) {
- resetInnerAccessor(frameId);
- }
- return id;
- }
-
- public int nextTupleInFrame(int tId) {
- int id = tId;
- while (id + 1 < getTupleCount()) {
- ++id;
- if (getTupleEndOffset(id) > 0) {
- return id;
- }
- }
- return UNSET;
- }
- };
- }
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/RunFileStream.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/RunFileStream.java
index fafe12e..07b8a00 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/RunFileStream.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/RunFileStream.java
@@ -105,25 +105,29 @@
runFileBuffer.reset();
}
- public void addToRunFile(ITupleAccessor accessor) throws HyracksDataException {
- int idx = accessor.getTupleId();
- addToRunFile(accessor, idx);
- }
-
- public void addToRunFile(IFrameTupleAccessor accessor, int idx) throws HyracksDataException {
- if (!runFileAppender.append(accessor, idx)) {
+ public void addToRunFile(IFrameTupleAccessor accessor, int tupleId) throws HyracksDataException {
+ if (!runFileAppender.append(accessor, tupleId)) {
runFileAppender.write(runFileWriter, true);
writeCount++;
- runFileAppender.append(accessor, idx);
+ runFileAppender.append(accessor, tupleId);
}
totalTupleCount++;
}
- public void startReadingRunFile(ITupleAccessor accessor) throws HyracksDataException {
- startReadingRunFile(accessor, 0);
+ public void addToRunFile(FrameTupleCursor cursor) throws HyracksDataException {
+ if (!runFileAppender.append(cursor.getAccessor(), cursor.getTupleId())) {
+ runFileAppender.write(runFileWriter, true);
+ writeCount++;
+ runFileAppender.append(cursor.getAccessor(), cursor.getTupleId());
+ }
+ totalTupleCount++;
}
- public void startReadingRunFile(ITupleAccessor accessor, long startOffset) throws HyracksDataException {
+ public void startReadingRunFile(FrameTupleCursor cursor) throws HyracksDataException {
+ startReadingRunFile(cursor, 0);
+ }
+
+ public void startReadingRunFile(FrameTupleCursor cursor, long startOffset) throws HyracksDataException {
if (runFileReader != null) {
runFileReader.close();
}
@@ -134,15 +138,14 @@
runFileReader.seek(startOffset);
previousReadPointer = 0;
// Load first frame
- loadNextBuffer(accessor);
+ loadNextBuffer(cursor);
}
- public boolean loadNextBuffer(ITupleAccessor accessor) throws HyracksDataException {
+ public boolean loadNextBuffer(FrameTupleCursor cursor) throws HyracksDataException {
final long tempFrame = runFileReader.position();
if (runFileReader.nextFrame(runFileBuffer)) {
previousReadPointer = tempFrame;
- accessor.reset(runFileBuffer.getBuffer());
- accessor.next();
+ cursor.reset(runFileBuffer.getBuffer());
readCount++;
return true;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TupleAccessor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TupleAccessor.java
deleted file mode 100644
index b97df37..0000000
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TupleAccessor.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-
-public class TupleAccessor extends FrameTupleAccessor implements ITupleAccessor {
- public static final int UNSET = -2;
- public static final int INITIALIZED = -1;
- private int tupleId = UNSET;
-
- public TupleAccessor(RecordDescriptor recordDescriptor) {
- super(recordDescriptor);
- }
-
- @Override
- public void reset(ByteBuffer buffer) {
- reset(buffer, 0, buffer.limit());
- tupleId = INITIALIZED;
- }
-
- public void reset(TuplePointer tp) {
- throw new IllegalAccessError("Should never call this reset");
- }
-
- @Override
- public int getTupleStartOffset() {
- return getTupleStartOffset(tupleId);
- }
-
- @Override
- public int getTupleEndOffset() {
- return getTupleStartOffset(tupleId);
- }
-
- @Override
- public int getTupleLength() {
- return getTupleLength(tupleId);
- }
-
- @Override
- public int getAbsFieldStartOffset(int fieldId) {
- return getAbsoluteFieldStartOffset(tupleId, fieldId);
- }
-
- @Override
- public int getFieldLength(int fieldId) {
- return getFieldLength(tupleId, fieldId);
- }
-
- @Override
- public int getFieldEndOffset(int fieldId) {
- return getFieldEndOffset(tupleId, fieldId);
- }
-
- @Override
- public int getFieldStartOffset(int fieldId) {
- return getFieldStartOffset(tupleId, fieldId);
- }
-
- @Override
- public int getTupleId() {
- return tupleId;
- }
-
- @Override
- public void getTuplePointer(TuplePointer tp) {
- tp.reset(INITIALIZED, tupleId);
- }
-
- @Override
- public void setTupleId(int tupleId) {
- this.tupleId = tupleId;
- }
-
- @Override
- public void reset() {
- tupleId = INITIALIZED;
- }
-
- @Override
- public boolean hasNext() {
- if (tupleId == UNSET) {
- return false;
- }
- return tupleId + 1 < getTupleCount();
- }
-
- @Override
- public void next() {
- ++tupleId;
- }
-
- @Override
- public boolean exists() {
- return INITIALIZED < tupleId && tupleId < getTupleCount();
- }
-}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TuplePointerCursor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TuplePointerCursor.java
new file mode 100644
index 0000000..b7fd6f0
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/interval/utils/memory/TuplePointerCursor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.operators.joins.interval.utils.memory;
+
+import java.util.Iterator;
+
+import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
+import org.apache.hyracks.dataflow.std.structures.TuplePointer;
+
+public class TuplePointerCursor extends AbstractTupleCursor<Iterator<TuplePointer>> {
+
+ Iterator<TuplePointer> iterator;
+ TuplePointer tp;
+
+ public TuplePointerCursor(ITuplePointerAccessor accessor) {
+ this.accessor = accessor;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public void next() {
+ TuplePointer tp = iterator.next();
+ this.tp = tp;
+ tupleId = tp.getTupleIndex();
+ ((ITuplePointerAccessor) accessor).reset(tp);
+ }
+
+ @Override
+ public void reset(Iterator<TuplePointer> iterator) {
+ this.iterator = iterator;
+ tupleId = INITIALIZED;
+ }
+
+ public void remove() {
+ iterator.remove();
+ }
+
+ public TuplePointer getTuplePointer() {
+ return tp;
+ }
+}