Merged online_aggregation @186:220
git-svn-id: https://hyracks.googlecode.com/svn/trunk/hyracks@221 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/.classpath b/hyracks-dataflow-std/.classpath
index 86f50f4..31cf404 100644
--- a/hyracks-dataflow-std/.classpath
+++ b/hyracks-dataflow-std/.classpath
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry kind="src" path="src/test/java"/>
+ <classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 228d22b..36a49ee 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -15,24 +15,13 @@
package edu.uci.ics.hyracks.dataflow.std.sort;
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
-import edu.uci.ics.hyracks.api.comm.IFrameReader;
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksContext;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -40,15 +29,10 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
-import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
public class ExternalSortOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String FRAMESORTER = "framesorter";
@@ -104,61 +88,29 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- final FrameSorter frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory,
- comparatorFactories, recordDescriptors[0]);
- final int maxSortFrames = framesLimit - 1;
+ final ExternalSortRunGenerator runGen = new ExternalSortRunGenerator(ctx, sortFields,
+ firstKeyNormalizerFactory, comparatorFactories, recordDescriptors[0], framesLimit);
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private LinkedList<File> runs;
-
@Override
public void open() throws HyracksDataException {
- runs = new LinkedList<File>();
- frameSorter.reset();
+ runGen.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- if (frameSorter.getFrameCount() >= maxSortFrames) {
- flushFramesToRun();
- }
- frameSorter.insertFrame(buffer);
+ runGen.nextFrame(buffer);
}
@Override
public void close() throws HyracksDataException {
- if (frameSorter.getFrameCount() > 0) {
- if (runs.size() <= 0) {
- frameSorter.sortFrames();
- env.set(FRAMESORTER, frameSorter);
- } else {
- flushFramesToRun();
- }
- }
- env.set(RUNS, runs);
- }
-
- private void flushFramesToRun() throws HyracksDataException {
- frameSorter.sortFrames();
- File runFile;
- try {
- runFile = ctx.getResourceManager().createFile(
- ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- RunFileWriter writer = new RunFileWriter(runFile);
- writer.open();
- try {
- frameSorter.flushFrames(writer);
- } finally {
- writer.close();
- }
- frameSorter.reset();
- runs.add(runFile);
+ runGen.close();
+ env.set(FRAMESORTER, runGen.getFrameSorter());
+ env.set(RUNS, runGen.getRuns());
}
@Override
public void flush() throws HyracksDataException {
+ runGen.flush();
}
};
return op;
@@ -176,283 +128,19 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
- private List<ByteBuffer> inFrames;
- private ByteBuffer outFrame;
- LinkedList<File> runs;
- private FrameTupleAppender outFrameAppender;
-
@Override
public void initialize() throws HyracksDataException {
- runs = (LinkedList<File>) env.get(RUNS);
- writer.open();
- try {
- if (runs.size() <= 0) {
- FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
- if (frameSorter != null) {
- frameSorter.flushFrames(writer);
- }
- env.set(FRAMESORTER, null);
- } else {
- inFrames = new ArrayList<ByteBuffer>();
- outFrame = ctx.getResourceManager().allocateFrame();
- outFrameAppender = new FrameTupleAppender(ctx);
- outFrameAppender.reset(outFrame, true);
- for (int i = 0; i < framesLimit - 1; ++i) {
- inFrames.add(ctx.getResourceManager().allocateFrame());
- }
- int passCount = 0;
- while (runs.size() > 0) {
- passCount++;
- try {
- doPass(runs, passCount);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
- }
- } finally {
- writer.close();
- }
+ List<File> runs = (List<File>) env.get(RUNS);
+ FrameSorter frameSorter = (FrameSorter) env.get(FRAMESORTER);
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, frameSorter, runs, sortFields,
+ comparatorFactories, recordDescriptors[0], framesLimit, writer);
+ merger.process();
+ env.set(FRAMESORTER, null);
env.set(RUNS, null);
}
-
- // creates a new run from runs that can fit in memory.
- private void doPass(LinkedList<File> runs, int passCount) throws HyracksDataException, IOException {
- File newRun = null;
- IFrameWriter writer = this.writer;
- boolean finalPass = false;
- if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
- finalPass = true;
- for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
- inFrames.remove(i);
- }
- } else {
- newRun = ctx.getResourceManager().createFile(
- ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
- writer = new RunFileWriter(newRun);
- writer.open();
- }
- try {
- RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
- FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
- Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
- ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDescriptors[0],
- inFrames.size(), comparator);
- int[] tupleIndexes = new int[inFrames.size()];
- for (int i = 0; i < inFrames.size(); i++) {
- tupleIndexes[i] = 0;
- int runIndex = topTuples.peek().getRunid();
- runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
- runCursors[runIndex].open();
- if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
- tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDescriptors[0]);
- tupleAccessors[runIndex].reset(inFrames.get(runIndex));
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- } else {
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- while (!topTuples.areRunsExhausted()) {
- ReferenceEntry top = topTuples.peek();
- int runIndex = top.getRunid();
- FrameTupleAccessor fta = top.getAccessor();
- int tupleIndex = top.getTupleIndex();
-
- if (!outFrameAppender.append(fta, tupleIndex)) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- if (!outFrameAppender.append(fta, tupleIndex)) {
- throw new IllegalStateException();
- }
- }
-
- ++tupleIndexes[runIndex];
- setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
- }
- if (outFrameAppender.getTupleCount() > 0) {
- FrameUtils.flushFrame(outFrame, writer);
- outFrameAppender.reset(outFrame, true);
- }
- runs.subList(0, inFrames.size()).clear();
- if (!finalPass) {
- runs.add(0, newRun);
- }
- } finally {
- if (!finalPass) {
- writer.close();
- }
- }
- }
-
- private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
- boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
- if (exists) {
- topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
- } else {
- topTuples.pop();
- closeRun(runIndex, runCursors, tupleAccessors);
- }
- }
-
- private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
- FrameTupleAccessor[] tupleAccessors) throws IOException {
- if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
- return false;
- } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
- ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
- if (runCursors[runIndex].nextFrame(buf)) {
- tupleIndexes[runIndex] = 0;
- return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
- } else {
- return false;
- }
- } else {
- return true;
- }
- }
-
- private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
- throws HyracksDataException {
- runCursors[index].close();
- runCursors[index] = null;
- tupleAccessor[index] = null;
- }
};
return op;
}
}
-
- private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
- return new Comparator<ReferenceEntry>() {
- public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
- FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
- FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
- int j1 = (Integer) tp1.getTupleIndex();
- int j2 = (Integer) tp2.getTupleIndex();
- byte[] b1 = fta1.getBuffer().array();
- byte[] b2 = fta2.getBuffer().array();
- for (int f = 0; f < sortFields.length; ++f) {
- int fIdx = sortFields[f];
- int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
- + fta1.getFieldStartOffset(j1, fIdx);
- int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
- int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
- + fta2.getFieldStartOffset(j2, fIdx);
- int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
- int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
- if (c != 0) {
- return c;
- }
- }
- return 0;
- }
- };
- }
-
- private class RunFileWriter implements IFrameWriter {
- private final File file;
- private FileChannel channel;
-
- public RunFileWriter(File file) {
- this.file = file;
- }
-
- @Override
- public void open() throws HyracksDataException {
- RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file, "rw");
- } catch (FileNotFoundException e) {
- throw new HyracksDataException(e);
- }
- channel = raf.getChannel();
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- int remain = buffer.capacity();
- while (remain > 0) {
- int len;
- try {
- len = channel.write(buffer);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- if (len < 0) {
- throw new HyracksDataException("Error writing data");
- }
- remain -= len;
- }
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (channel != null) {
- try {
- channel.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- @Override
- public void flush() throws HyracksDataException {
- }
- }
-
- public static class RunFileReader implements IFrameReader {
- private final File file;
- private FileChannel channel;
-
- public RunFileReader(File file) throws FileNotFoundException {
- this.file = file;
- }
-
- @Override
- public void open() throws HyracksDataException {
- RandomAccessFile raf;
- try {
- raf = new RandomAccessFile(file, "r");
- } catch (FileNotFoundException e) {
- throw new HyracksDataException(e);
- }
- channel = raf.getChannel();
- }
-
- @Override
- public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
- buffer.clear();
- int remain = buffer.capacity();
- while (remain > 0) {
- int len;
- try {
- len = channel.read(buffer);
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- if (len < 0) {
- return false;
- }
- remain -= len;
- }
- return true;
- }
-
- @Override
- public void close() throws HyracksDataException {
- try {
- channel.close();
- } catch (IOException e) {
- throw new HyracksDataException(e);
- }
- }
- }
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
new file mode 100644
index 0000000..e2a08c2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunGenerator.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ExternalSortRunGenerator implements IFrameWriter {
+ private final IHyracksContext ctx;
+ private final FrameSorter frameSorter;
+ private final List<File> runs;
+ private final int maxSortFrames;
+
+ public ExternalSortRunGenerator(IHyracksContext ctx, int[] sortFields,
+ INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDesc, int framesLimit) {
+ this.ctx = ctx;
+ frameSorter = new FrameSorter(ctx, sortFields, firstKeyNormalizerFactory, comparatorFactories, recordDesc);
+ runs = new LinkedList<File>();
+ maxSortFrames = framesLimit - 1;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ runs.clear();
+ frameSorter.reset();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (frameSorter.getFrameCount() >= maxSortFrames) {
+ flushFramesToRun();
+ }
+ frameSorter.insertFrame(buffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (frameSorter.getFrameCount() > 0) {
+ if (runs.size() <= 0) {
+ frameSorter.sortFrames();
+ } else {
+ flushFramesToRun();
+ }
+ }
+ }
+
+ private void flushFramesToRun() throws HyracksDataException {
+ frameSorter.sortFrames();
+ File runFile;
+ try {
+ runFile = ctx.getResourceManager().createFile(ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ RunFileWriter writer = new RunFileWriter(runFile);
+ writer.open();
+ try {
+ frameSorter.flushFrames(writer);
+ } finally {
+ writer.close();
+ }
+ frameSorter.reset();
+ runs.add(runFile);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+
+ public FrameSorter getFrameSorter() {
+ return frameSorter;
+ }
+
+ public List<File> getRuns() {
+ return runs;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
new file mode 100644
index 0000000..91114b2
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortRunMerger.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
+import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
+
+public class ExternalSortRunMerger {
+ private final IHyracksContext ctx;
+ private final FrameSorter frameSorter;
+ private final List<File> runs;
+ private final int[] sortFields;
+ private final IBinaryComparator[] comparators;
+ private final RecordDescriptor recordDesc;
+ private final int framesLimit;
+ private final IFrameWriter writer;
+ private List<ByteBuffer> inFrames;
+ private ByteBuffer outFrame;
+ private FrameTupleAppender outFrameAppender;
+
+ public ExternalSortRunMerger(IHyracksContext ctx, FrameSorter frameSorter, List<File> runs, int[] sortFields,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDesc, int framesLimit,
+ IFrameWriter writer) {
+ this.ctx = ctx;
+ this.frameSorter = frameSorter;
+ this.runs = runs;
+ this.sortFields = sortFields;
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ this.recordDesc = recordDesc;
+ this.framesLimit = framesLimit;
+ this.writer = writer;
+ }
+
+ public void process(boolean doFinalPass) throws HyracksDataException {
+ if (doFinalPass) {
+ writer.open();
+ }
+ try {
+ if (runs.size() <= 0) {
+ if (frameSorter != null) {
+ frameSorter.flushFrames(writer);
+ }
+ } else {
+ inFrames = new ArrayList<ByteBuffer>();
+ outFrame = ctx.getResourceManager().allocateFrame();
+ outFrameAppender = new FrameTupleAppender(ctx);
+ outFrameAppender.reset(outFrame, true);
+ for (int i = 0; i < framesLimit - 1; ++i) {
+ inFrames.add(ctx.getResourceManager().allocateFrame());
+ }
+ int passCount = 0;
+ while (runs.size() > 0) {
+ passCount++;
+ try {
+ doPass(runs, passCount, doFinalPass);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ } finally {
+ if (doFinalPass) {
+ writer.close();
+ }
+ }
+ }
+
+ public void process() throws HyracksDataException {
+ process(true);
+ }
+
+ // creates a new run from runs that can fit in memory.
+ private void doPass(List<File> runs, int passCount, boolean doFinalPass) throws HyracksDataException, IOException {
+ File newRun = null;
+ IFrameWriter writer = this.writer;
+ boolean finalPass = false;
+ if (runs.size() + 1 <= framesLimit) { // + 1 outFrame
+ if (!doFinalPass) {
+ return;
+ }
+ finalPass = true;
+ for (int i = inFrames.size() - 1; i >= runs.size(); i--) {
+ inFrames.remove(i);
+ }
+ } else {
+ newRun = ctx.getResourceManager().createFile(ExternalSortOperatorDescriptor.class.getSimpleName(), ".run");
+ writer = new RunFileWriter(newRun);
+ writer.open();
+ }
+ try {
+ RunFileReader[] runCursors = new RunFileReader[inFrames.size()];
+ FrameTupleAccessor[] tupleAccessors = new FrameTupleAccessor[inFrames.size()];
+ Comparator<ReferenceEntry> comparator = createEntryComparator(comparators);
+ ReferencedPriorityQueue topTuples = new ReferencedPriorityQueue(ctx, recordDesc, inFrames.size(),
+ comparator);
+ int[] tupleIndexes = new int[inFrames.size()];
+ for (int i = 0; i < inFrames.size(); i++) {
+ tupleIndexes[i] = 0;
+ int runIndex = topTuples.peek().getRunid();
+ runCursors[runIndex] = new RunFileReader(runs.get(runIndex));
+ runCursors[runIndex].open();
+ if (runCursors[runIndex].nextFrame(inFrames.get(runIndex))) {
+ tupleAccessors[runIndex] = new FrameTupleAccessor(ctx, recordDesc);
+ tupleAccessors[runIndex].reset(inFrames.get(runIndex));
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ } else {
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ while (!topTuples.areRunsExhausted()) {
+ ReferenceEntry top = topTuples.peek();
+ int runIndex = top.getRunid();
+ FrameTupleAccessor fta = top.getAccessor();
+ int tupleIndex = top.getTupleIndex();
+
+ if (!outFrameAppender.append(fta, tupleIndex)) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ if (!outFrameAppender.append(fta, tupleIndex)) {
+ throw new IllegalStateException();
+ }
+ }
+
+ ++tupleIndexes[runIndex];
+ setNextTopTuple(runIndex, tupleIndexes, runCursors, tupleAccessors, topTuples);
+ }
+ if (outFrameAppender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(outFrame, writer);
+ outFrameAppender.reset(outFrame, true);
+ }
+ runs.subList(0, inFrames.size()).clear();
+ if (!finalPass) {
+ runs.add(0, newRun);
+ }
+ } finally {
+ if (!finalPass) {
+ writer.close();
+ }
+ }
+ }
+
+ private void setNextTopTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors, ReferencedPriorityQueue topTuples) throws IOException {
+ boolean exists = hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ if (exists) {
+ topTuples.popAndReplace(tupleAccessors[runIndex], tupleIndexes[runIndex]);
+ } else {
+ topTuples.pop();
+ closeRun(runIndex, runCursors, tupleAccessors);
+ }
+ }
+
+ private boolean hasNextTuple(int runIndex, int[] tupleIndexes, RunFileReader[] runCursors,
+ FrameTupleAccessor[] tupleAccessors) throws IOException {
+ if (tupleAccessors[runIndex] == null || runCursors[runIndex] == null) {
+ return false;
+ } else if (tupleIndexes[runIndex] >= tupleAccessors[runIndex].getTupleCount()) {
+ ByteBuffer buf = tupleAccessors[runIndex].getBuffer(); // same-as-inFrames.get(runIndex)
+ if (runCursors[runIndex].nextFrame(buf)) {
+ tupleIndexes[runIndex] = 0;
+ return hasNextTuple(runIndex, tupleIndexes, runCursors, tupleAccessors);
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ private void closeRun(int index, RunFileReader[] runCursors, IFrameTupleAccessor[] tupleAccessor)
+ throws HyracksDataException {
+ runCursors[index].close();
+ runCursors[index] = null;
+ tupleAccessor[index] = null;
+ }
+
+ private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
+ return new Comparator<ReferenceEntry>() {
+ public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
+ FrameTupleAccessor fta1 = (FrameTupleAccessor) tp1.getAccessor();
+ FrameTupleAccessor fta2 = (FrameTupleAccessor) tp2.getAccessor();
+ int j1 = tp1.getTupleIndex();
+ int j2 = tp2.getTupleIndex();
+ byte[] b1 = fta1.getBuffer().array();
+ byte[] b2 = fta2.getBuffer().array();
+ for (int f = 0; f < sortFields.length; ++f) {
+ int fIdx = sortFields[f];
+ int s1 = fta1.getTupleStartOffset(j1) + fta1.getFieldSlotsLength()
+ + fta1.getFieldStartOffset(j1, fIdx);
+ int l1 = fta1.getFieldEndOffset(j1, fIdx) - fta1.getFieldStartOffset(j1, fIdx);
+ int s2 = fta2.getTupleStartOffset(j2) + fta2.getFieldSlotsLength()
+ + fta2.getFieldStartOffset(j2, fIdx);
+ int l2 = fta2.getFieldEndOffset(j2, fIdx) - fta2.getFieldStartOffset(j2, fIdx);
+ int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileReader.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileReader.java
new file mode 100644
index 0000000..f31100e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileReader.java
@@ -0,0 +1,59 @@
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class RunFileReader implements IFrameReader {
+ private final File file;
+ private FileChannel channel;
+
+ public RunFileReader(File file) throws FileNotFoundException {
+ this.file = file;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file, "r");
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ channel = raf.getChannel();
+ }
+
+ @Override
+ public boolean nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ buffer.clear();
+ int remain = buffer.capacity();
+ while (remain > 0) {
+ int len;
+ try {
+ len = channel.read(buffer);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ if (len < 0) {
+ return false;
+ }
+ remain -= len;
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileWriter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileWriter.java
new file mode 100644
index 0000000..fd0a03e
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/RunFileWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.sort;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class RunFileWriter implements IFrameWriter {
+ private final File file;
+ private FileChannel channel;
+
+ public RunFileWriter(File file) {
+ this.file = file;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file, "rw");
+ } catch (FileNotFoundException e) {
+ throw new HyracksDataException(e);
+ }
+ channel = raf.getChannel();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ int remain = buffer.capacity();
+ while (remain > 0) {
+ int len;
+ try {
+ len = channel.write(buffer);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ if (len < 0) {
+ throw new HyracksDataException("Error writing data");
+ }
+ remain -= len;
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (channel != null) {
+ try {
+ channel.close();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+}
\ No newline at end of file