cross merge fullstack_release_candidate into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/hyracks/hyracks-dataflow-std/pom.xml b/fullstack/hyracks/hyracks-dataflow-std/pom.xml
index bf27d20..2cf0fdc 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -1,8 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
- <version>0.2.3-SNAPSHOT</version>
<name>hyracks-dataflow-std</name>
<parent>
@@ -18,8 +16,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index f2b56fa..f86d9fb 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -256,8 +256,6 @@
outputAppender.reset(outputFrame, true);
- writer.open();
-
if (tPointers == null) {
// Not sorted
for (int i = 0; i < tableSize; ++i) {
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 3e5e30f..d86f1d5 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -17,7 +17,6 @@
import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -47,11 +46,18 @@
private final ISerializableTable table;
private final int tableSize;
private final TuplePointer storedTuplePointer;
+ private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table)
throws HyracksDataException {
+ this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, false);
+ }
+
+ public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
+ ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
+ FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1, ISerializableTable table, boolean reverse) throws HyracksDataException {
this.tableSize = tableSize;
this.table = table;
storedTuplePointer = new TuplePointer();
@@ -76,6 +82,7 @@
} else {
nullTupleBuild = null;
}
+ reverseOutputOrder = reverse;
}
public void build(ByteBuffer buffer) throws HyracksDataException {
@@ -108,18 +115,13 @@
int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
matchFound = true;
- if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
- flushFrame(outBuffer, writer);
- appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
- throw new IllegalStateException();
- }
- }
+ appendToResult(i, tIndex, writer);
}
} while (true);
if (!matchFound && isLeftOuter) {
- if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
+
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
@@ -128,6 +130,7 @@
throw new IllegalStateException();
}
}
+
}
}
}
@@ -145,23 +148,25 @@
buffer.position(0);
buffer.limit(buffer.capacity());
}
-
- private static class Link {
- private static final int INIT_POINTERS_SIZE = 8;
-
- long[] pointers;
- int size;
-
- Link() {
- pointers = new long[INIT_POINTERS_SIZE];
- size = 0;
- }
-
- void add(long pointer) {
- if (size >= pointers.length) {
- pointers = Arrays.copyOf(pointers, pointers.length * 2);
+
+ private void appendToResult(int probeSidetIx, int buildSidetIx, IFrameWriter writer) throws HyracksDataException{
+ if(!reverseOutputOrder){
+ if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorProbe, probeSidetIx, accessorBuild, buildSidetIx)) {
+ throw new IllegalStateException();
+ }
}
- pointers[size++] = pointer;
- }
+ }
+ else{
+ if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorBuild, buildSidetIx, accessorProbe, probeSidetIx)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
}
}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 7e84229..6870e71 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -14,15 +14,18 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -42,9 +45,12 @@
private RunFileReader runFileReader;
private int currentMemSize = 0;
private final RunFileWriter runFileWriter;
+ private final boolean isLeftOuter;
+ private final ArrayTupleBuilder nullTupleBuilder;
public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
- ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+ ITuplePairComparator comparators, int memSize, boolean isLeftOuter, INullWriter[] nullWriters1)
+ throws HyracksDataException {
this.accessorInner = accessor1;
this.accessorOuter = accessor0;
this.appender = new FrameTupleAppender(ctx.getFrameSize());
@@ -56,6 +62,19 @@
this.memSize = memSize;
this.ctx = ctx;
+ this.isLeftOuter = isLeftOuter;
+ if (isLeftOuter) {
+ int innerFieldCount = accessorInner.getFieldCount();
+ nullTupleBuilder = new ArrayTupleBuilder(innerFieldCount);
+ DataOutput out = nullTupleBuilder.getDataOutput();
+ for (int i = 0; i < innerFieldCount; i++) {
+ nullWriters1[i].writeNull(out);
+ nullTupleBuilder.addFieldEndOffset();
+ }
+ } else {
+ nullTupleBuilder = null;
+ }
+
FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
this.getClass().getSimpleName() + this.toString());
runFileWriter = new RunFileWriter(file, ctx.getIOManager());
@@ -108,9 +127,11 @@
int tupleCount1 = accessorInner.getTupleCount();
for (int i = 0; i < tupleCount0; ++i) {
+ boolean matchFound = false;
for (int j = 0; j < tupleCount1; ++j) {
int c = compare(accessorOuter, i, accessorInner, j);
if (c == 0) {
+ matchFound = true;
if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
@@ -120,6 +141,18 @@
}
}
}
+
+ if (!matchFound && isLeftOuter) {
+ if (!appender.appendConcat(accessorOuter, i, nullTupleBuilder.getFieldEndOffsets(),
+ nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize())) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorOuter, i, nullTupleBuilder.getFieldEndOffsets(),
+ nullTupleBuilder.getByteArray(), 0, nullTupleBuilder.getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
}
}
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index a699703..0be01c1 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -25,6 +25,8 @@
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -47,13 +49,18 @@
private static final long serialVersionUID = 1L;
private final ITuplePairComparatorFactory comparatorFactory;
private final int memSize;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
- ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize) {
+ ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
+ boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
super(spec, 2, 1);
this.comparatorFactory = comparatorFactory;
this.recordDescriptors[0] = recordDescriptor;
this.memSize = memSize;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
}
@Override
@@ -111,6 +118,13 @@
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private JoinCacheTaskState state;
@@ -118,8 +132,11 @@
public void open() throws HyracksDataException {
state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
partition));
+
state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
- new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+ new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, isLeftOuter,
+ nullWriters1);
+
}
@Override
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index 2905574..6e2b16a 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -81,6 +81,8 @@
private int[] buildPSizeInFrames; //Used for partition tuning
private int freeFramesCounter; //Used for partition tuning
+
+ private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
@@ -89,10 +91,10 @@
this.memForJoin = memForJoin;
this.buildRd = buildRd;
this.probeRd = probeRd;
- this.buildHpc = probeHpc;
- this.probeHpc = buildHpc;
- this.buildKeys = keys0;
- this.probeKeys = keys1;
+ this.buildHpc = buildHpc;
+ this.probeHpc = probeHpc;
+ this.buildKeys = keys1;
+ this.probeKeys = keys0;
this.comparators = comparators;
this.rel0Name = rel0Name;
this.rel1Name = rel1Name;
@@ -117,10 +119,10 @@
this.memForJoin = memForJoin;
this.buildRd = buildRd;
this.probeRd = probeRd;
- this.buildHpc = probeHpc;
- this.probeHpc = buildHpc;
- this.buildKeys = keys0;
- this.probeKeys = keys1;
+ this.buildHpc = buildHpc;
+ this.probeHpc = probeHpc;
+ this.buildKeys = keys1;
+ this.probeKeys = keys0;
this.comparators = comparators;
this.rel0Name = rel0Name;
this.rel1Name = rel1Name;
@@ -171,6 +173,12 @@
public void build(ByteBuffer buffer) throws HyracksDataException {
accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
+
+ boolean print = false;
+ if(print){
+ accessorBuild.prettyPrint();
+ }
+
for (int i = 0; i < tupleCount; ++i) {
int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
processTuple(i, pid);
@@ -338,6 +346,7 @@
createInMemoryJoiner(inMemTupCount);
cacheInMemJoin();
+ this.isTableEmpty = (inMemTupCount == 0);
}
private void partitionTune() throws HyracksDataException {
@@ -457,10 +466,14 @@
}
public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
-
accessorProbe.reset(buffer);
int tupleCount = accessorProbe.getTupleCount();
+ boolean print = false;
+ if(print){
+ accessorProbe.prettyPrint();
+ }
+
if (numOfSpilledParts == 0) {
inMemJoiner.join(buffer, writer);
return;
@@ -604,4 +617,8 @@
+ freeFramesCounter;
return s;
}
+
+ public boolean isTableEmpty(){
+ return this.isTableEmpty;
+ }
}
\ No newline at end of file
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 3a7ee2c..cf39416 100644
--- a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -135,6 +136,7 @@
recordDescriptors[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
+
}
@@ -167,10 +169,10 @@
ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(probeAid, buildAid);
builder.addActivity(this, phase1);
- builder.addSourceEdge(0, phase1, 0);
+ builder.addSourceEdge(1, phase1, 0);
builder.addActivity(this, phase2);
- builder.addSourceEdge(1, phase2, 0);
+ builder.addSourceEdge(0, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
@@ -253,14 +255,8 @@
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
-
- final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
- if (isLeftOuter) {
- for (int i = 0; i < nullWriterFactories1.length; i++) {
- nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
- }
- }
-
+
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
.getJobId(), new TaskId(getActivityId(), partition));
@@ -278,9 +274,17 @@
state.memForJoin = memsize - 2;
state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
nPartitions);
- state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
- PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
- buildHpc);
+ if(!isLeftOuter){
+ state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+ PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+ buildHpc);
+ }
+ else{
+ state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+ PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+ buildHpc, isLeftOuter, nullWriterFactories1);
+ }
+
state.hybridHJ.initBuild();
}
@@ -368,7 +372,9 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.hybridHJ.probe(buffer, writer);
+ if(!state.hybridHJ.isTableEmpty()){
+ state.hybridHJ.probe(buffer, writer);
+ }
}
@Override
@@ -418,34 +424,40 @@
//Apply in-Mem HJ if possible
if ((buildPartSize < state.memForJoin) || (probePartSize < state.memForJoin)) {
int tabSize = -1;
- if (buildPartSize < probePartSize) {
+
+ if (isLeftOuter || buildPartSize < probePartSize) {
tabSize = ohhj.getBuildPartitionSizeInTup(pid);
+
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
- //Build Side is smaller
- applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, hpcRep1, hpcRep0,
- buildSideReader, probeSideReader);
+ //Build Side is smaller
+ applyInMemHashJoin(buildKeys, probeKeys, tabSize, probeRd, buildRd, hpcRep0, hpcRep1,
+ buildSideReader, probeSideReader, false, pid);
- } else { //Role Reversal
+ }
+
+ else { //Role Reversal
tabSize = ohhj.getProbePartitionSizeInTup(pid);
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
//Probe Side is smaller
- applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, hpcRep0, hpcRep1,
- probeSideReader, buildSideReader);
+
+ applyInMemHashJoin(probeKeys, buildKeys, tabSize, buildRd, probeRd, hpcRep1, hpcRep0,
+ probeSideReader, buildSideReader, true, pid);
}
}
//Apply (Recursive) HHJ
else {
OptimizedHybridHashJoin rHHj;
- if (buildPartSize < probePartSize) { //Build Side is smaller
+ if (isLeftOuter || buildPartSize < probePartSize) { //Build Side is smaller
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
nPartitions);
+
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
@@ -488,14 +500,14 @@
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+
if (rbrfw == null || rprfw == null) {
continue;
}
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
- if (buildSideInTups < probeSideInTups) {
+ if (isLeftOuter || buildSideInTups < probeSideInTups) {
applyNestedLoopJoin(probeRd, buildRd, state.memForJoin, rbrfw, rprfw,
nljComparator0);
} else {
@@ -507,6 +519,7 @@
} else { //Role Reversal (Probe Side is smaller)
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
nPartitions);
+
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
@@ -545,7 +558,7 @@
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+
if (rbrfw == null || rprfw == null) {
continue;
}
@@ -569,14 +582,14 @@
private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
- ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader)
+ ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
- isLeftOuter, nullWriters1, table);
+ isLeftOuter, nullWriters1, table, reverse);
bReader.open();
rPartbuff.clear();
@@ -604,7 +617,7 @@
throws HyracksDataException {
NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
- new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize);
+ new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, false, null);
ByteBuffer cacheBuff = ctx.allocateFrame();
innerReader.open();
diff --git a/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
new file mode 100644
index 0000000..edca60a
--- /dev/null
+++ b/fullstack/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.result;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameOutputStream;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class ResultWriterOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ private final ResultSetId rsId;
+
+ private final boolean ordered;
+
+ private final IResultSerializerFactory resultSerializerFactory;
+
+ public ResultWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, ResultSetId rsId, boolean ordered,
+ IResultSerializerFactory resultSerializerFactory) throws IOException {
+ super(spec, 1, 0);
+ this.rsId = rsId;
+ this.ordered = ordered;
+ this.resultSerializerFactory = resultSerializerFactory;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
+ final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
+
+ final ByteBuffer outputBuffer = ctx.allocateFrame();
+
+ final FrameOutputStream frameOutputStream = new FrameOutputStream(ctx.getFrameSize());
+ frameOutputStream.reset(outputBuffer, true);
+ PrintStream printStream = new PrintStream(frameOutputStream);
+
+ final RecordDescriptor outRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ final IResultSerializer resultSerializer = resultSerializerFactory.createResultSerializer(outRecordDesc,
+ printStream);
+
+ final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDesc);
+
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ IFrameWriter datasetPartitionWriter;
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
+ datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, rsId, ordered, partition,
+ nPartitions);
+ datasetPartitionWriter.open();
+ resultSerializer.init();
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ frameTupleAccessor.reset(buffer);
+ for (int tIndex = 0; tIndex < frameTupleAccessor.getTupleCount(); tIndex++) {
+ resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+ if (!frameOutputStream.appendTuple()) {
+ datasetPartitionWriter.nextFrame(outputBuffer);
+ frameOutputStream.reset(outputBuffer, true);
+
+ /* TODO(madhusudancs): This works under the assumption that no single serialized record is
+ * longer than the buffer size.
+ */
+ resultSerializer.appendTuple(frameTupleAccessor, tIndex);
+ frameOutputStream.appendTuple();
+ }
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ datasetPartitionWriter.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (frameOutputStream.getTupleCount() > 0) {
+ datasetPartitionWriter.nextFrame(outputBuffer);
+ frameOutputStream.reset(outputBuffer, true);
+ }
+ datasetPartitionWriter.close();
+ }
+ };
+ }
+}
\ No newline at end of file