Addressing Reviewer's comments
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_sort_join_opts@927 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 8f2bb81..2905574 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -34,7 +34,7 @@
private final int INVALID_BUFFER = -2;
private final int UNALLOCATED_FRAME = -3;
private final int BUFFER_FOR_RESIDENT_PARTS = -1;
-
+
private IHyracksTaskContext ctx;
private final String rel0Name;
@@ -579,7 +579,7 @@
return pStatus;
}
- public String _stats_debug_getStats() {
+ public String debugGetStats() {
int numOfResidentPartitions = 0;
int numOfSpilledPartitions = 0;
double sumOfBuildSpilledSizes = 0;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index e6a7035..c6c899d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -83,7 +83,7 @@
*/
public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-
+
private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
index f9dda7f..7454523 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
@@ -9,7 +9,6 @@
/**
* @author pouria
- *
* Implements Memory Manager based on creating Binary Search Tree (BST)
* while Free slot size is the key for the BST nodes. Each node in BST
* shows a class of free slots, while all the free slots within a class
@@ -18,7 +17,6 @@
* as a separate data structure, but the free slots in the memory are
* used to hold BST nodes. Each BST node has the logical structure,
* defined in the BSTNodeUtil class.
- *
*/
public class BSTMemMgr implements IMemoryManager {
@@ -28,22 +26,15 @@
private ByteBuffer[] frames;
private ByteBuffer convertBuffer;
private Slot root;
- private Slot result; // A reusable object to hold one node returned as
- // method result
- private Slot insertSlot; // A reusable object to hold one node within insert
- // process
+ private Slot result; // A reusable object to hold one node returned as method result
+ private Slot insertSlot; // A reusable object to hold one node within insert process
+ private Slot lastLeftParent; //A reusable object for the search process
+ private Slot lastLeft; //A reusable object for the search process
+ private Slot parent; //A reusable object for the search process
private Slot[] parentRes;
private int lastFrame;
- // Variables used for debugging/performance testing
- private int debugFreeSlots = 0;
- private int debugTreeSize = 0;
- private int debugTotalLookupSteps;
- private int debugTotalLookupCounts;
- private int debugDepthCounter;
- private int debugMaxDepth;
-
public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
this.ctx = ctx;
frameSize = ctx.getFrameSize();
@@ -53,9 +44,10 @@
root = new Slot();
insertSlot = new Slot();
result = new Slot();
+ lastLeftParent = new Slot();
+ lastLeft = new Slot();
+ parent = new Slot();
parentRes = new Slot[] { new Slot(), new Slot() };
- debugTotalLookupCounts = 0;
- debugTotalLookupSteps = 0;
}
/**
@@ -63,7 +55,6 @@
*/
@Override
public void allocate(int length, Slot result) throws HyracksDataException {
- debugTotalLookupCounts++;
search(length, parentRes);
if (parentRes[1].isNull()) {
addFrame(parentRes);
@@ -77,8 +68,7 @@
if (shouldSplit(sl, acLen)) {
int[] s = split(parentRes[1], parentRes[0], acLen);
int insertLen = BSTNodeUtil.getLength(s[2], s[3], frames, convertBuffer);
- insert(s[2], s[3], insertLen); // inserting second half of the split
- // slot
+ insert(s[2], s[3], insertLen); // inserting second half of the split slot
BSTNodeUtil.setHeaderFooter(s[0], s[1], length, false, frames);
result.set(s[0], s[1]);
return;
@@ -97,8 +87,7 @@
: BSTNodeUtil.INVALID_INDEX);
int t = off + 2 * BSTNodeUtil.HEADER_SIZE + actualLen;
int nextMemSlotHeaderOffset = (t < frameSize ? t : BSTNodeUtil.INVALID_INDEX);
- // Remember: next and prev memory slots have the same frame index as the
- // unallocated slot
+ // Remember: next and prev memory slots have the same frame index as the unallocated slot
if (!isNodeNull(fix, prevMemSlotFooterOffset) && BSTNodeUtil.isFree(fix, prevMemSlotFooterOffset, frames)) {
int leftLength = BSTNodeUtil.getLength(fix, prevMemSlotFooterOffset, frames, convertBuffer);
removeFromList(fix, prevMemSlotFooterOffset - leftLength - BSTNodeUtil.HEADER_SIZE);
@@ -156,24 +145,12 @@
return frames[frameIndex];
}
- public String debugGetAvgSearchPath() {
- double avg = (((double) debugTotalLookupSteps) / ((double) debugTotalLookupCounts));
- return "\nTotal allocation requests:\t" + debugTotalLookupCounts + "\nAvg Allocation Path Length:\t" + avg
- + "\nMax BST Depth:\t" + debugMaxDepth;
- }
-
- public void debugDecLookupCount() {
- debugTotalLookupCounts--;
- }
-
/**
- *
* @param parentResult
* is the container passed by the caller to contain the results
* @throws HyracksDataException
*/
private void addFrame(Slot[] parentResult) throws HyracksDataException {
- debugDepthCounter = 0;
clear(parentResult);
if ((lastFrame + 1) >= frames.length) {
return;
@@ -192,13 +169,9 @@
}
while (!parentResult[1].isNull()) {
- debugDepthCounter++;
if (BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer) == l) {
append(parentResult[1].getFrameIx(), parentResult[1].getOffset(), lastFrame, 0);
parentResult[1].set(lastFrame, 0);
- if (debugDepthCounter > debugMaxDepth) {
- debugMaxDepth = debugDepthCounter;
- }
return;
}
if (l < BSTNodeUtil.getLength(parentResult[1], frames, convertBuffer)) {
@@ -208,9 +181,6 @@
frames);
parentResult[0].copy(parentResult[1]);
parentResult[1].set(lastFrame, 0);
- if (debugDepthCounter > debugMaxDepth) {
- debugMaxDepth = debugDepthCounter;
- }
return;
} else {
parentResult[0].copy(parentResult[1]);
@@ -224,9 +194,6 @@
frames);
parentResult[0].copy(parentResult[1]);
parentResult[1].set(lastFrame, 0);
- if (debugDepthCounter > debugMaxDepth) {
- debugMaxDepth = debugDepthCounter;
- }
return;
} else {
parentResult[0].copy(parentResult[1]);
@@ -239,7 +206,6 @@
}
private void insert(int fix, int off, int length) throws HyracksDataException {
- debugDepthCounter = 0;
BSTNodeUtil.setHeaderFooter(fix, off, length, true, frames);
initNewNode(fix, off);
@@ -251,13 +217,9 @@
insertSlot.clear();
insertSlot.copy(root);
while (!insertSlot.isNull()) {
- debugDepthCounter++;
int curSlotLen = BSTNodeUtil.getLength(insertSlot, frames, convertBuffer);
if (curSlotLen == length) {
append(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off);
- if (debugDepthCounter > debugMaxDepth) {
- debugMaxDepth = debugDepthCounter;
- }
return;
}
if (length < curSlotLen) {
@@ -266,9 +228,6 @@
if (isNodeNull(leftChildFIx, leftChildOffset)) {
initNewNode(fix, off);
BSTNodeUtil.setLeftChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
- if (debugDepthCounter > debugMaxDepth) {
- debugMaxDepth = debugDepthCounter;
- }
return;
} else {
insertSlot.set(leftChildFIx, leftChildOffset);
@@ -279,9 +238,6 @@
if (isNodeNull(rightChildFIx, rightChildOffset)) {
initNewNode(fix, off);
BSTNodeUtil.setRightChild(insertSlot.getFrameIx(), insertSlot.getOffset(), fix, off, frames);
- if (debugDepthCounter > debugMaxDepth) {
- debugMaxDepth = debugDepthCounter;
- }
return;
} else {
insertSlot.set(rightChildFIx, rightChildOffset);
@@ -304,13 +260,12 @@
return;
}
- Slot lastLeftParent = new Slot();
- Slot lastLeft = new Slot();
- Slot parent = new Slot();
+ lastLeftParent.clear();
+ lastLeft.clear();
+ parent.clear();
result.copy(root);
while (!result.isNull()) {
- debugTotalLookupSteps++;
if (BSTNodeUtil.getLength(result, frames, convertBuffer) == length) {
target[0].copy(parent);
target[1].copy(result);
@@ -658,16 +613,11 @@
}
public String debugPrintMemory() {
- debugFreeSlots = 0;
Slot s = new Slot(0, 0);
if (s.isNull()) {
return "memory:\tNull";
}
- if (BSTNodeUtil.isFree(0, 0, frames)) {
- debugFreeSlots++;
- }
-
String m = "memory:\n" + debugPrintSlot(0, 0) + "\n";
int length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(0, 0, frames, convertBuffer));
int noff = (length + 2 * BSTNodeUtil.HEADER_SIZE >= frameSize ? BSTNodeUtil.INVALID_INDEX : length + 2
@@ -678,9 +628,6 @@
}
s.set(nfix, noff);
while (!isNodeNull(s.getFrameIx(), s.getOffset())) {
- if (BSTNodeUtil.isFree(s.getFrameIx(), s.getOffset(), frames)) {
- debugFreeSlots++;
- }
m += debugPrintSlot(s.getFrameIx(), s.getOffset()) + "\n";
length = BSTNodeUtil.getActualLength(BSTNodeUtil.getLength(s.getFrameIx(), s.getOffset(), frames,
convertBuffer));
@@ -694,16 +641,14 @@
}
s.set(nfix, noff);
}
- return m + "\nFree Slots:\t" + debugFreeSlots;
+ return m;
}
public String debugPrintTree() {
- debugTreeSize = 0;
Slot node = new Slot();
node.copy(root);
if (!node.isNull()) {
- debugTreeSize++;
- return debugPrintSubTree(node) + "\nTree Nodes:\t" + debugTreeSize;
+ return debugPrintSubTree(node);
}
return "Null";
}
@@ -727,11 +672,9 @@
+ debugPrintSlot(lfix, loff) + ") - " + "(RC: " + debugPrintSlot(rfix, roff) + ") - " + "(NX: "
+ debugPrintSlot(nfix, noff) + ") - " + "(PR: " + debugPrintSlot(pfix, poff) + ") }\n";
if (!isNodeNull(lfix, loff)) {
- debugTreeSize++;
s += debugPrintSubTree(new Slot(lfix, loff)) + "\n";
}
if (!isNodeNull(rfix, roff)) {
- debugTreeSize++;
s += debugPrintSubTree(new Slot(rfix, roff)) + "\n";
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
index 0294dc0..3a1a629 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTNodeUtil.java
@@ -4,10 +4,8 @@
/**
* @author pouria
- *
* Implements utility methods, used extensively and repeatedly within
* the BSTMemMgr.
- *
* Mainly includes methods to set/get different types of pointers,
* required and accessed within BST traversal, along with the methods
* for setting/getting length/header/footer of free slots, which have
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
index 09984aa..98621f5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -1,17 +1,3 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
@@ -26,14 +12,37 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+/**
+ * @author pouria
+ * This class defines the logic for merging the run, generated during
+ * the first phase of external sort (for both sorting without replacement
+ * selection and with it). For the case with replacement selection, this
+ * code also takes the limit on the output into account (if specified).
+ * If number of input runs is less than the available memory frames,
+ * then merging can be done in one pass, by allocating one buffer per
+ * run, and one buffer as the output buffer. A priorityQueue is used to
+ * find the top tuple at each iteration, among all the runs' heads in
+ * memory (check RunMergingFrameReader for more details). Otherwise,
+ * assuming that we have R runs and M memory buffers, where (R > M), we
+ * first merge first (M-1) runs and create a new sorted run, out of
+ * them. Discarding the first (M-1) runs, now merging procedure gets
+ * applied recursively on the (R-M+2) remaining runs using the M memory
+ * buffers.
+ * For the case of replacement selection, if outputLimit is specified,
+ * once the final pass is done on the runs (which is the pass
+ * that generates the final sorted output), as soon as the output size
+ * hits the output limit, the process stops, closes, and returns.
+ */
+
public class ExternalSortRunMerger {
+
private final IHyracksTaskContext ctx;
- private final FrameSorter frameSorter;
private final List<IFrameReader> runs;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
@@ -44,6 +53,12 @@
private ByteBuffer outFrame;
private FrameTupleAppender outFrameAppender;
+ private final FrameSorter frameSorter; //Used in External sort, no replacement selection
+ private FrameTupleAccessor outFrameAccessor; //Used in External sort, with replacement selection
+ private final int outputLimit; //Used in External sort, with replacement selection and limit on output size
+ private int currentSize; //Used in External sort, with replacement selection and limit on output size
+
+ //Constructor for external sort, no replacement selection
public ExternalSortRunMerger(IHyracksTaskContext ctx, FrameSorter frameSorter, List<IFrameReader> runs,
int[] sortFields, IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit,
IFrameWriter writer) {
@@ -55,6 +70,22 @@
this.recordDesc = recordDesc;
this.framesLimit = framesLimit;
this.writer = writer;
+ this.outputLimit = -1;
+ }
+
+ //Constructor for external sort with replacement selection
+ public ExternalSortRunMerger(IHyracksTaskContext ctx, int outputLimit, List<IFrameReader> runs, int[] sortFields,
+ IBinaryComparator[] comparators, RecordDescriptor recordDesc, int framesLimit, IFrameWriter writer) {
+ this.ctx = ctx;
+ this.runs = new LinkedList<IFrameReader>(runs);
+ this.sortFields = sortFields;
+ this.comparators = comparators;
+ this.recordDesc = recordDesc;
+ this.framesLimit = framesLimit;
+ this.writer = writer;
+ this.outputLimit = outputLimit;
+ this.currentSize = 0;
+ this.frameSorter = null;
}
public void process() throws HyracksDataException {
@@ -128,4 +159,145 @@
}
}
}
-}
\ No newline at end of file
+
+ public void processWithReplacementSelection() throws HyracksDataException {
+ writer.open();
+
+ try {
+ outFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+ outFrame = ctx.allocateFrame();
+ outFrameAppender = new FrameTupleAppender(ctx.getFrameSize());
+ outFrameAppender.reset(outFrame, true);
+
+ if (runs.size() == 1) {
+
+ if (outputLimit < 1) {
+ runs.get(0).open();
+ ByteBuffer nextFrame = ctx.allocateFrame();
+ while (runs.get(0).nextFrame(nextFrame)) {
+ FrameUtils.flushFrame(nextFrame, writer);
+ outFrameAppender.reset(nextFrame, true);
+ }
+ return;
+ }
+
+ int totalCount = 0;
+ runs.get(0).open();
+ FrameTupleAccessor fta = new FrameTupleAccessor(ctx.getFrameSize(), recordDesc);
+ ByteBuffer nextFrame = ctx.allocateFrame();
+ while (totalCount <= outputLimit && runs.get(0).nextFrame(nextFrame)) {
+ fta.reset(nextFrame);
+ int tupCount = fta.getTupleCount();
+ if ((totalCount + tupCount) < outputLimit) {
+ FrameUtils.flushFrame(nextFrame, writer);
+ totalCount += tupCount;
+ continue;
+ }
+ // The very last buffer, which exceeds the limit
+ int copyCount = outputLimit - totalCount;
+ outFrameAppender.reset(outFrame, true);
+ for (int i = 0; i < copyCount; i++) {
+ if (!outFrameAppender.append(fta, i)) {
+ throw new IllegalStateException();
+ }
+ totalCount++;
+ }
+ }
+
+ if (outFrameAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ }
+
+ return;
+ }
+
+ // More than one run, actual merging is needed
+ inFrames = new ArrayList<ByteBuffer>();
+ for (int i = 0; i < framesLimit - 1; ++i) {
+ inFrames.add(ctx.allocateFrame());
+ }
+ while (runs.size() > 0) {
+ try {
+ doPassWithReplacementSelection(runs);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+
+ // creates a new run from runs that can fit in memory.
+ private void doPassWithReplacementSelection(List<IFrameReader> runs) throws HyracksDataException {
+ FileReference newRun = null;
+ IFrameWriter writer = this.writer;
+ boolean finalPass = false;
+ if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
+ finalPass = true;
+ for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+ inFrames.remove(i);
+ }
+ } else {
+ newRun = ctx.createManagedWorkspaceFile(ExternalSortRunMerger.class.getSimpleName());
+ writer = new RunFileWriter(newRun, ctx.getIOManager());
+ writer.open();
+ }
+ try {
+ IFrameReader[] runCursors = new RunFileReader[inFrames.size()];
+ for (int i = 0; i < inFrames.size(); i++) {
+ runCursors[i] = runs.get(i);
+ }
+ RunMergingFrameReader merger = new RunMergingFrameReader(ctx, runCursors, inFrames, sortFields,
+ comparators, recordDesc);
+ merger.open();
+ try {
+ while (merger.nextFrame(outFrame)) {
+ if (outputLimit > 0 && finalPass) {
+ outFrameAccessor.reset(outFrame);
+ int count = outFrameAccessor.getTupleCount();
+ if ((currentSize + count) > outputLimit) {
+ ByteBuffer b = ctx.allocateFrame();
+ FrameTupleAppender partialAppender = new FrameTupleAppender(ctx.getFrameSize());
+ partialAppender.reset(b, true);
+ int copyCount = outputLimit - currentSize;
+ for (int i = 0; i < copyCount; i++) {
+ partialAppender.append(outFrameAccessor, i);
+ currentSize++;
+ }
+ FrameUtils.makeReadable(b);
+ FrameUtils.flushFrame(b, writer);
+ break;
+ } else {
+ FrameUtils.flushFrame(outFrame, writer);
+ currentSize += count;
+ }
+ } else {
+ FrameUtils.flushFrame(outFrame, writer);
+ }
+ }
+ } finally {
+ merger.close();
+ }
+
+ if (outputLimit > 0 && finalPass && (currentSize >= outputLimit)) {
+ runs.clear();
+ return;
+ }
+
+ runs.subList(0, inFrames.size()).clear();
+ if (!finalPass) {
+ runs.add(0, ((RunFileWriter) writer).createReader());
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
index b45c5f9..1447fa6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IMemoryManager.java
@@ -8,7 +8,6 @@
/**
* @author pouria
- *
* Defines the required operations, needed for any memory manager, used
* in sorting with replacement selection, to manage the free spaces
*/
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
index 7568e26..69cf8b1 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/IRunGenerator.java
@@ -7,7 +7,6 @@
/**
* @author pouria
- *
* Interface for the Run Generator
*/
public interface IRunGenerator extends IFrameWriter {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
index 73407f3..a7d6c08 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ISelectionTree.java
@@ -2,7 +2,6 @@
/**
* @author pouria
- *
* Defines the selection tree, used in sorting with replacement
* selection to manage the order of output tuples into the runs, during
* the run generation phase. This tree contains tuples, belonging to two
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index 2f1c94c..e283c33 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -29,16 +29,13 @@
/**
* @author pouria
- *
* Operator descriptor for sorting with replacement, consisting of two
* phases:
- *
* - Run Generation: Denoted by OptimizedSortActivity below, in which
* sort runs get generated from the input data. This phases uses the
* Selection Tree and Memory Manager to benefit from the replacement
* selection optimization, to create runs which are longer than the
* available memory size.
- *
* - Merging: Denoted by MergeActivity below, in which runs (generated
* in the previous phase) get merged via a merger. Each run has a single
* buffer in memory, and a priority queue is used to select the top
@@ -191,10 +188,10 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- OptimizedExternalSortRunMerger merger = new OptimizedExternalSortRunMerger(ctx, outputLimit, runs,
- sortFields, comparators, recordDescriptors[0], memSize, writer);
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, outputLimit, runs, sortFields,
+ comparators, recordDescriptors[0], memSize, writer);
- merger.process();
+ merger.processWithReplacementSelection();
}
};
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
index 1430e6f..8b9e4fe 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGenerator.java
@@ -1,7 +1,6 @@
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -21,41 +20,31 @@
/**
* @author pouria
- *
* This class implements the run generator for sorting with replacement
* selection, where there is no limit on the output, i.e. the whole data
* should be sorted. A SortMinHeap is used as the selectionTree to
* decide the order of writing tuples into the runs, while memory
* manager is based on a binary search tree to allocate tuples in the
* memory.
- *
* The overall process is as follows: - Read the input data frame by
* frame. For each tuple T in the current frame:
- *
* - Try to allocate a memory slot for writing T along with the attached
* header/footer (for memory management purpose)
- *
* - If T can not be allocated, try to output as many tuples, currently
* resident in memory, as needed so that a free slot, large enough to
* hold T, gets created. MinHeap decides about which tuple should be
* sent to the output at each step.
- *
* - Write T into the memory
- *
* - Calculate the runID of T (based on the last output tuple for the
* current run). It is either the current run or the next run. Also
* calculate Poorman's Normalized Key (PNK) for T, to make comparisons
* faster later.
- *
* - Create a heap element for T, containing: its runID, the slot
* pointer to its memory location, and its PNK.
- *
* - Insert the created heap element into the heap
- *
* - Upon closing, write all the tuples, currently resident in memory,
* into their corresponding run(s). Again min heap decides about which
* tuple is the next for output.
- *
* OptimizedSortOperatorDescriptor will merge the generated runs, to
* generate the final sorted output of the data.
*/
@@ -63,7 +52,7 @@
private final IHyracksTaskContext ctx;
private final int[] sortFields;
private final INormalizedKeyComputer nkc;
- IBinaryComparatorFactory[] comparatorFactories;
+ private final IBinaryComparatorFactory[] comparatorFactories;
private final IBinaryComparator[] comparators;
private final RecordDescriptor recordDescriptor;
private final List<IFrameReader> runs;
@@ -90,7 +79,7 @@
// the selectionTree to output
private int[] sTreeTop;
- RunFileWriter writer;
+ private RunFileWriter writer;
private boolean newRun;
private int curRunId;
@@ -148,7 +137,6 @@
}
}
memMgr.allocate(tLength, allocationPtr);
- ((BSTMemMgr) memMgr).debugDecLookupCount();
}
memMgr.writeTuple(allocationPtr.getFrameIx(), allocationPtr.getOffset(), inputAccessor, i);
int runId = getRunId(inputAccessor, i);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
index f69f676..50c9c9c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortRunGeneratorWithLimit.java
@@ -1,7 +1,6 @@
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -21,47 +20,34 @@
/**
* @author pouria
- *
* This class implements the run generator for sorting with replacement
* selection, where there is a limit on the output, i.e. we are looking
* for top-k tuples (first k smallest tuples w.r.t sorting keys).
- *
* A SortMinMaxHeap is used as the selectionTree to decide the order of
* writing tuples into the runs, and also to prune tuples (if possible).
* Memory manager is based on a binary search tree and is used to
* allocate memory slots for tuples.
- *
* The overall process is as follows (Assuming that the limit is K):
- *
* - Read the input data frame by frame. For each tuple T in the current
* frame:
- *
* - If currentRun R has reached the limit of K on the size, and (T >
* maximum tuple of R), then ignore T.
- *
* - Otherwise, try to allocate a memory slot for writing T along with
* the attached header/footer (for memory management purpose)
- *
* - If T can not be allocated, try to output as many tuples, currently
* resident in memory, as needed so that a free slot, large enough to
* hold T, gets created. MinMaxHeap decides about which tuple should be
* sent to the output at each step.
- *
* - Write T into memory.
- *
* - Calculate the runID of T (based on the last output tuple for the
* current run). It is either the current run or the next run. Also
* calculate Poorman's Normalized Key (PNK) for T, to make comparisons
* faster later.
- *
* - Create an heap element for T, containing its runID, the slot ptr to
* its memory location, and its PNK.
- *
* - If runID is the nextRun, insert the heap element into the heap, and
* increment the size of nextRun.
- *
* - If runID is the currentRun, then:
- *
* - If currentRun has not hit the limit of k, insert the element into
* the heap, and increase currentRun size. - Otherwise, currentRun has
* hit the limit of K, while T is less than the max. So discard the
@@ -69,10 +55,8 @@
* unallocating its memory location) and insert the heap element into
* the heap. No need to change the currentRun size as we are replacing
* an old element (the old max) with T.
- *
* - Upon closing, write all the tuples, currently resident in memory,
* into their corresponding run(s).
- *
* - Note that upon opening a new Run R, if size of R (based on stats)
* is S and (S > K), then (S-K) current maximum tuples of R (which are
* resident in memory) get discarded at the beginning. MinMax heap can
@@ -83,7 +67,7 @@
private final IHyracksTaskContext ctx;
private final int[] sortFields;
private final INormalizedKeyComputer nkc;
- IBinaryComparatorFactory[] comparatorFactories;
+ private final IBinaryComparatorFactory[] comparatorFactories;
private final IBinaryComparator[] comparators;
private final RecordDescriptor recordDescriptor;
private final List<IFrameReader> runs;
@@ -114,7 +98,7 @@
private Slot discard;
private int[] sTreeTop;
private int[] peek;
- RunFileWriter writer;
+ private RunFileWriter writer;
private boolean newRun;
private int curRunId;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
index 2ceea17..aaa9049 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/Slot.java
@@ -2,15 +2,12 @@
/**
* @author pouria
- *
* Defines a slot in the memory, which can be a free or used (allocated)
* slot. Memory is a set of frames, ordered as a list. Each tuple is
* stored in a slot, where the location of the slot is denoted by a pair
* of integers:
- *
* - The index of the frame, in the list of frames in memory. (referred
* to as frameIx)
- *
* - The starting offset of the slot, within that specific frame.
* (referred to as offset)
*/
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
index 383fba8..a221118 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinHeap.java
@@ -1,8 +1,7 @@
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Arrays;
import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -11,21 +10,16 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
/**
- *
* @author pouria
- *
* Implements a minimum binary heap, used as selection tree, for sort
* with replacement. This heap structure can only be used as the min
* heap (no access to the max element). Elements in the heap are
* compared based on their run numbers, and sorting key(s):
- *
* Considering two heap elements A and B:
- *
* if RunNumber(A) > RunNumber(B) then A is larger than B if
* RunNumber(A) == RunNumber(B), then A is smaller than B, if and only
* if the value of the sort key(s) in B is greater than A (based on the
* sort comparator).
- *
*/
public class SortMinHeap implements ISelectionTree {
@@ -33,17 +27,18 @@
static final int FRAME_IX = 1;
static final int OFFSET_IX = 2;
private static final int PNK_IX = 3;
+ private static final int ELEMENT_SIZE = 4;
+ private static final int INIT_ARRAY_SIZE = 512;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
private final RecordDescriptor recordDescriptor;
private final FrameTupleAccessor fta1;
private final FrameTupleAccessor fta2;
-
- List<int[]> tree;
- IMemoryManager memMgr;
- int[] top; // Used as the temp variable to access the top, to avoid object
- // creation
+ private int[] elements;
+ private int nextIx;
+ private final IMemoryManager memMgr;
+ private int[] top; // Used as a temp variable to access the top, to avoid object creation
public SortMinHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, IMemoryManager memMgr) {
@@ -56,8 +51,11 @@
fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
this.memMgr = memMgr;
-
- this.tree = new ArrayList<int[]>();
+ this.top = new int[ELEMENT_SIZE];
+ Arrays.fill(top, -1);
+ this.elements = new int[INIT_ARRAY_SIZE];
+ Arrays.fill(elements, -1);
+ this.nextIx = 0;
}
/*
@@ -65,7 +63,7 @@
*/
@Override
public void getMin(int[] result) {
- if (tree.size() == 0) {
+ if (nextIx == 0) {
result[0] = result[1] = result[2] = result[3] = -1;
return;
}
@@ -78,49 +76,56 @@
@Override
public void peekMin(int[] result) {
- if (tree.size() == 0) {
+ if (nextIx == 0) {
result[0] = result[1] = result[2] = result[3] = -1;
return;
}
-
- top = tree.get(0);
- for (int i = 0; i < top.length; i++) {
- result[i] = top[i];
+ for (int i = 0; i < ELEMENT_SIZE; i++) {
+ result[i] = elements[i];
}
}
@Override
public void insert(int[] e) {
- tree.add(e);
- siftUp(tree.size() - 1);
+ if (nextIx >= elements.length) {
+ elements = Arrays.copyOf(elements, elements.length * 2);
+ }
+ for (int i = 0; i < ELEMENT_SIZE; i++) {
+ elements[nextIx + i] = e[i];
+ }
+ siftUp(nextIx);
+ nextIx += ELEMENT_SIZE;
+
}
@Override
public void reset() {
- this.tree.clear();
+ Arrays.fill(elements, -1);
+ nextIx = 0;
}
@Override
public boolean isEmpty() {
- return (tree.size() < 1);
+ return (nextIx < ELEMENT_SIZE);
}
public int _debugGetSize() {
- return tree.size();
+ return (nextIx > 0 ? (nextIx - 1) / 4 : 0);
}
private int[] delete(int nix) {
- int[] nv = tree.get(nix);
- int[] last = tree.remove(tree.size() - 1);
+ int[] nv = Arrays.copyOfRange(elements, nix, nix + ELEMENT_SIZE);
+ int[] lastElem = removeLast();
- if (tree.size() > 0) {
- tree.set(nix, last);
- } else {
+ if (nextIx == 0) {
return nv;
}
+ for (int i = 0; i < ELEMENT_SIZE; i++) {
+ elements[nix + i] = lastElem[i];
+ }
int pIx = getParent(nix);
- if (pIx > -1 && (compare(last, tree.get(pIx)) < 0)) {
+ if (pIx > -1 && (compare(lastElem, Arrays.copyOfRange(elements, pIx, pIx + ELEMENT_SIZE)) < 0)) {
siftUp(nix);
} else {
siftDown(nix);
@@ -128,6 +133,16 @@
return nv;
}
+ private int[] removeLast() {
+ if (nextIx < ELEMENT_SIZE) { //this is the very last element
+ return new int[] { -1, -1, -1, -1 };
+ }
+ int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
+ Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
+ nextIx -= ELEMENT_SIZE;
+ return l;
+ }
+
private void siftUp(int nodeIx) {
int p = getParent(nodeIx);
if (p < 0) {
@@ -160,8 +175,8 @@
// first < sec : -1
private int compare(int nodeSIx1, int nodeSIx2) {
- int[] n1 = tree.get(nodeSIx1);
- int[] n2 = tree.get(nodeSIx2);
+ int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE);
+ int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE);
return (compare(n1, n2));
}
@@ -220,24 +235,30 @@
return ((compare(lix, rix) < 0) ? lix : rix);
}
+ //Assumption: n1Ix and n2Ix are starting indices of two elements
private void swap(int n1Ix, int n2Ix) {
- int[] temp = tree.get(n1Ix);
- tree.set(n1Ix, tree.get(n2Ix));
- tree.set(n2Ix, temp);
+ int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
+ for (int i = 0; i < ELEMENT_SIZE; i++) {
+ elements[n1Ix + i] = elements[n2Ix + i];
+ elements[n2Ix + i] = temp[i];
+ }
}
private int getLeftChild(int ix) {
- int lix = 2 * ix + 1;
- return ((lix < tree.size()) ? lix : -1);
+ int lix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + ELEMENT_SIZE;
+ return ((lix < nextIx) ? lix : -1);
}
private int getRightChild(int ix) {
- int rix = 2 * ix + 2;
- return ((rix < tree.size()) ? rix : -1);
+ int rix = (2 * ELEMENT_SIZE) * (ix / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
+ return ((rix < nextIx) ? rix : -1);
}
private int getParent(int ix) {
- return ((ix - 1) / 2);
+ if (ix <= 0) {
+ return -1;
+ }
+ return ((ix - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
}
private ByteBuffer getFrame(int frameIx) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
index c040ccc..dcaf781 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/SortMinMaxHeap.java
@@ -1,8 +1,7 @@
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Arrays;
import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
@@ -12,7 +11,6 @@
/**
* @author pouria
- *
* Implements a MinMax binary heap, used as the selection tree, in
* sorting with replacement. Check SortMinHeap for details on comparing
* elements.
@@ -23,6 +21,8 @@
static final int OFFSET_IX = 2;
private static final int PNK_IX = 3;
private static final int NOT_EXIST = -1;
+ private static final int ELEMENT_SIZE = 4;
+ private static final int INIT_ARRAY_SIZE = 512;
private final int[] sortFields;
private final IBinaryComparator[] comparators;
@@ -30,11 +30,10 @@
private final FrameTupleAccessor fta1;
private final FrameTupleAccessor fta2;
- List<int[]> tree;
- IMemoryManager memMgr;
+ private int[] elements;
+ private int nextIx;
- int[] top; // Used as the temp variable to access the top, to avoid object
- // creation
+ private final IMemoryManager memMgr;
public SortMinMaxHeap(IHyracksCommonContext ctx, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDesc, IMemoryManager memMgr) {
@@ -47,124 +46,143 @@
fta1 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
fta2 = new FrameTupleAccessor(ctx.getFrameSize(), recordDescriptor);
this.memMgr = memMgr;
-
- this.tree = new ArrayList<int[]>();
+ this.elements = new int[INIT_ARRAY_SIZE];
+ Arrays.fill(elements, -1);
+ this.nextIx = 0;
}
@Override
public void insert(int[] element) {
- tree.add(element);
- bubbleUp(tree.size() - 1);
+ if (nextIx >= elements.length) {
+ elements = Arrays.copyOf(elements, elements.length * 2);
+ }
+ for (int i = 0; i < ELEMENT_SIZE; i++) {
+ elements[nextIx + i] = element[i];
+ }
+ nextIx += ELEMENT_SIZE;
+ bubbleUp(nextIx - ELEMENT_SIZE);
}
@Override
public void getMin(int[] result) {
- if (tree.size() == 0) {
+ if (nextIx == 0) {
result[0] = result[1] = result[2] = result[3] = -1;
return;
}
- top = delete(0);
- for (int i = 0; i < top.length; i++) {
- result[i] = top[i];
+ int[] topElem = delete(0);
+ for (int x = 0; x < ELEMENT_SIZE; x++) {
+ result[x] = topElem[x];
}
}
@Override
public void reset() {
- this.tree.clear();
+ Arrays.fill(elements, -1);
+ nextIx = 0;
}
@Override
public boolean isEmpty() {
- return (tree.size() < 1);
+ return (nextIx < ELEMENT_SIZE);
}
@Override
public void peekMin(int[] result) {
- if (tree.size() == 0) {
+ if (nextIx == 0) {
result[0] = result[1] = result[2] = result[3] = -1;
return;
}
- top = tree.get(0);
- for (int i = 0; i < top.length; i++) {
- result[i] = top[i];
+ for (int x = 0; x < ELEMENT_SIZE; x++) {
+ result[x] = elements[x];
}
}
@Override
public void getMax(int[] result) {
- if (tree.size() == 1) {
- top = tree.remove(0);
- for (int i = 0; i < top.length; i++) {
- result[i] = top[i];
+ if (nextIx == ELEMENT_SIZE) {
+ int[] topElement = removeLast();
+ for (int x = 0; x < ELEMENT_SIZE; x++) {
+ result[x] = topElement[x];
}
return;
}
- if (tree.size() > 1) {
+ if (nextIx > ELEMENT_SIZE) {
int lc = getLeftChild(0);
int rc = getRightChild(0);
+ int maxIx = lc;
- if (rc == -1) {
- top = delete(lc);
- } else {
- top = (compare(lc, rc) < 0) ? delete(rc) : delete(lc);
+ if (rc != -1) {
+ maxIx = compare(lc, rc) < 0 ? rc : lc;
}
- for (int i = 0; i < top.length; i++) {
- result[i] = top[i];
+ int[] maxElem = delete(maxIx);
+ for (int x = 0; x < ELEMENT_SIZE; x++) {
+ result[x] = maxElem[x];
}
return;
-
}
+
result[0] = result[1] = result[2] = result[3] = -1;
+
}
@Override
public void peekMax(int[] result) {
- if (tree.size() == 1) {
- top = tree.get(0);
- for (int i = 0; i < top.length; i++) {
- result[i] = top[i];
+ if (nextIx == ELEMENT_SIZE) {
+ for (int i = 0; i < ELEMENT_SIZE; i++) {
+ result[i] = elements[i];
}
return;
}
- if (tree.size() > 1) {
+ if (nextIx > ELEMENT_SIZE) {
int lc = getLeftChild(0);
int rc = getRightChild(0);
+ int maxIx = lc;
- if (rc == -1) {
- top = tree.get(lc);
- } else {
- top = (compare(lc, rc) < 0) ? tree.get(rc) : tree.get(lc);
+ if (rc != -1) {
+ maxIx = compare(lc, rc) < 0 ? rc : lc;
}
- for (int i = 0; i < top.length; i++) {
- result[i] = top[i];
+ for (int x = 0; x < ELEMENT_SIZE; x++) {
+ result[x] = elements[maxIx + x];
}
+
return;
}
result[0] = result[1] = result[2] = result[3] = -1;
}
private int[] delete(int delIx) {
- int s = tree.size();
- if (s > 1) {
- int[] delEntry = tree.get(delIx);
- int[] last = (tree.remove(s - 1));
- if (delIx != tree.size()) {
- tree.set(delIx, last);
+ int s = nextIx;
+ if (nextIx > ELEMENT_SIZE) {
+ int[] delEntry = Arrays.copyOfRange(elements, delIx, delIx + ELEMENT_SIZE);
+ int[] last = removeLast();
+ if (delIx != (s - ELEMENT_SIZE)) {
+ for (int x = 0; x < ELEMENT_SIZE; x++) {
+ elements[delIx + x] = last[x];
+ }
trickleDown(delIx);
}
return delEntry;
- } else if (s == 1) {
- return tree.remove(0);
+ } else if (nextIx == ELEMENT_SIZE) {
+ return (removeLast());
}
return null;
}
+ private int[] removeLast() {
+ if (nextIx < ELEMENT_SIZE) { //this is the very last element
+ return new int[] { -1, -1, -1, -1 };
+ }
+ int[] l = Arrays.copyOfRange(elements, nextIx - ELEMENT_SIZE, nextIx);
+ Arrays.fill(elements, nextIx - ELEMENT_SIZE, nextIx, -1);
+ nextIx -= ELEMENT_SIZE;
+ return l;
+ }
+
private void bubbleUp(int ix) {
int p = getParentIx(ix);
if (isAtMinLevel(ix)) {
@@ -301,16 +319,18 @@
}
private void swap(int n1Ix, int n2Ix) {
- int[] temp = tree.get(n1Ix);
- tree.set(n1Ix, tree.get(n2Ix));
- tree.set(n2Ix, temp);
+ int[] temp = Arrays.copyOfRange(elements, n1Ix, n1Ix + ELEMENT_SIZE);
+ for (int i = 0; i < ELEMENT_SIZE; i++) {
+ elements[n1Ix + i] = elements[n2Ix + i];
+ elements[n2Ix + i] = temp[i];
+ }
}
private int getParentIx(int i) {
- if (i == 0) {
+ if (i < ELEMENT_SIZE) {
return NOT_EXIST;
}
- return (i - 1) / 2;
+ return ((i - ELEMENT_SIZE) / (2 * ELEMENT_SIZE)) * ELEMENT_SIZE;
}
private int getGrandParent(int i) {
@@ -319,8 +339,8 @@
}
private int getLeftChild(int i) {
- int lc = 2 * i + 1;
- return lc < tree.size() ? lc : NOT_EXIST;
+ int lc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + ELEMENT_SIZE;
+ return (lc < nextIx ? lc : -1);
}
private int[] getLeftGrandChildren(int i) {
@@ -329,8 +349,8 @@
}
private int getRightChild(int i) {
- int rc = 2 * i + 2;
- return rc < tree.size() ? rc : -1;
+ int rc = (2 * ELEMENT_SIZE) * (i / ELEMENT_SIZE) + (2 * ELEMENT_SIZE);
+ return (rc < nextIx ? rc : -1);
}
private int[] getRightGrandChildren(int i) {
@@ -344,11 +364,13 @@
}
private int getLevel(int i) {
- if (i == 0) {
+ if (i < ELEMENT_SIZE) {
return 0;
}
- int l = (int) Math.floor(Math.log(i) / Math.log(2));
- if (i == (((int) Math.pow(2, (l + 1))) - 1)) {
+
+ int cnv = i / ELEMENT_SIZE;
+ int l = (int) Math.floor(Math.log(cnv) / Math.log(2));
+ if (cnv == (((int) Math.pow(2, (l + 1))) - 1)) {
return (l + 1);
}
return l;
@@ -360,8 +382,8 @@
// first < sec : -1
private int compare(int nodeSIx1, int nodeSIx2) {
- int[] n1 = tree.get(nodeSIx1);
- int[] n2 = tree.get(nodeSIx2);
+ int[] n1 = Arrays.copyOfRange(elements, nodeSIx1, nodeSIx1 + ELEMENT_SIZE); //tree.get(nodeSIx1);
+ int[] n2 = Arrays.copyOfRange(elements, nodeSIx2, nodeSIx2 + ELEMENT_SIZE); //tree.get(nodeSIx2);
return (compare(n1, n2));
}
@@ -407,13 +429,4 @@
}
return 0;
}
-
- public String _debugPrintTree() {
- String s = "";
- for (int i = 0; i < tree.size(); i++) {
- int[] n = tree.get(i);
- s += "\t[" + i + "](" + n[RUN_ID_IX] + ", " + n[FRAME_IX] + ", " + n[OFFSET_IX] + "), ";
- }
- return s;
- }
}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
index c7aa1a1..91214c8 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/OptimizedSortMergeTest.java
@@ -90,6 +90,8 @@
runTest(spec);
}
+
+
@Test
public void optimizedSortMergeTest02() throws Exception {
JobSpecification spec = new JobSpecification();
@@ -113,7 +115,7 @@
UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
- int outputLimit = 15;
+ int outputLimit = 200;
OptimizedExternalSortOperatorDescriptor sorter = new OptimizedExternalSortOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE,
UTF8StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
@@ -137,4 +139,5 @@
runTest(spec);
}
+
}
\ No newline at end of file