Merged trunk -r 363:437 into branch
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@468 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-std/.settings/org.eclipse.jdt.core.prefs b/hyracks-dataflow-std/.settings/org.eclipse.jdt.core.prefs
index 1ca1bd4..450f5c4 100644
--- a/hyracks-dataflow-std/.settings/org.eclipse.jdt.core.prefs
+++ b/hyracks-dataflow-std/.settings/org.eclipse.jdt.core.prefs
@@ -1,4 +1,4 @@
-#Thu Jul 29 14:32:56 PDT 2010
+#Fri May 20 19:34:04 PDT 2011
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.compliance=1.6
diff --git a/hyracks-dataflow-std/pom.xml b/hyracks-dataflow-std/pom.xml
index 0940856..747e657 100644
--- a/hyracks-dataflow-std/pom.xml
+++ b/hyracks-dataflow-std/pom.xml
@@ -2,12 +2,12 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
<parent>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
</parent>
<build>
@@ -27,14 +27,14 @@
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>edu.uci.ics.hyracks</groupId>
<artifactId>hyracks-dataflow-common</artifactId>
- <version>0.1.4</version>
+ <version>0.1.5</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
index a99cba9..2ddffde 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/CountAggregatorFactory.java
@@ -69,7 +69,7 @@
throws HyracksDataException {
count = IntegerSerializerDeserializer.getInt(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
+ accessor.getFieldStartOffset(tIndex, fIndex));
}
@@ -89,7 +89,7 @@
throws HyracksDataException {
count += IntegerSerializerDeserializer.getInt(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
+ accessor.getFieldStartOffset(tIndex, fIndex));
}
};
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
index 7e6bc8c..8fd1f13 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/FloatSumAggregatorFactory.java
@@ -103,7 +103,7 @@
throws HyracksDataException {
sum = FloatSerializerDeserializer.getFloat(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
+ accessor.getFieldStartOffset(tIndex, fIndex));
}
@@ -117,7 +117,7 @@
throws HyracksDataException {
sum += FloatSerializerDeserializer.getFloat(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
+ accessor.getFieldStartOffset(tIndex, fIndex));
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
index 0c3b0d5..cc999d5 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/MinMaxAggregatorFactory.java
@@ -100,7 +100,7 @@
throws HyracksDataException {
minmax = FloatSerializerDeserializer.getFloat(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
+ accessor.getFieldStartOffset(tIndex, fIndex));
}
@@ -115,7 +115,7 @@
throws HyracksDataException {
minmax = FloatSerializerDeserializer.getFloat(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
+ accessor.getFieldStartOffset(tIndex, fIndex));
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
index 64335f1..c6bcc81 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/aggregators/SumAggregatorFactory.java
@@ -102,7 +102,7 @@
throws HyracksDataException {
sum = IntegerSerializerDeserializer.getInt(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
+ accessor.getFieldStartOffset(tIndex, fIndex));
}
@@ -116,7 +116,7 @@
throws HyracksDataException {
sum += IntegerSerializerDeserializer.getInt(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 2
+ accessor.getTupleStartOffset(tIndex) + accessor.getFieldCount() * 4
+ accessor.getFieldStartOffset(tIndex, fIndex));
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 0eebd93..e4613dd 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -19,6 +19,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -50,7 +51,8 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+ public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+ ICCApplicationContext appCtx) {
// do nothing
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index 7e97975..4453412 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -19,6 +19,7 @@
import org.json.JSONException;
import org.json.JSONObject;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.IConstraintExpressionAcceptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -68,7 +69,8 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+ public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+ ICCApplicationContext appCtx) {
// do nothing
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index a038a40..6ee9ab37 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.dataflow.std.connectors;
+import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.comm.IConnectionDemultiplexer;
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
@@ -52,7 +53,8 @@
}
@Override
- public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan) {
+ public void contributeSchedulingConstraints(IConstraintExpressionAcceptor constraintAcceptor, JobPlan plan,
+ ICCApplicationContext appCtx) {
JobSpecification jobSpec = plan.getJobSpecification();
IOperatorDescriptor consumer = jobSpec.getConsumer(this);
IOperatorDescriptor producer = jobSpec.getProducer(this);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index bff101e..1a69952 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -44,8 +46,8 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String SMALLRELATION = "RelR";
- private static final String LARGERELATION = "RelS";
+ private static final String RELATION0 = "Rel0";
+ private static final String RELATION1 = "Rel1";
private static final long serialVersionUID = 1L;
private final int[] keys0;
@@ -56,6 +58,8 @@
private final double factor;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
@@ -69,24 +73,44 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.recordsPerFrame = recordsPerFrame;
+ this.factor = factor;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
}
@Override
public void contributeTaskGraph(IActivityGraphBuilder builder) {
- HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
- HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
+ HashPartitionActivityNode part0 = new HashPartitionActivityNode(RELATION0, keys0, 0);
+ HashPartitionActivityNode part1 = new HashPartitionActivityNode(RELATION1, keys1, 1);
JoinActivityNode join = new JoinActivityNode();
- builder.addTask(rpart);
- builder.addSourceEdge(0, rpart, 0);
+ builder.addTask(part0);
+ builder.addSourceEdge(0, part0, 0);
- builder.addTask(spart);
- builder.addSourceEdge(1, spart, 0);
+ builder.addTask(part1);
+ builder.addSourceEdge(1, part1, 0);
builder.addTask(join);
- builder.addBlockingEdge(rpart, spart);
- builder.addBlockingEdge(spart, join);
+ builder.addBlockingEdge(part0, part1);
+ builder.addBlockingEdge(part1, join);
builder.addTargetEdge(0, join, 0);
}
@@ -217,18 +241,24 @@
}
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
private InMemoryHashJoin joiner;
- private RunFileWriter[] rWriters;
- private RunFileWriter[] sWriters;
+ private RunFileWriter[] buildWriters;
+ private RunFileWriter[] probeWriters;
private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
@Override
public void initialize() throws HyracksDataException {
- rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
- sWriters = (RunFileWriter[]) env.get(LARGERELATION);
+ buildWriters = (RunFileWriter[]) env.get(RELATION1);
+ probeWriters = (RunFileWriter[]) env.get(RELATION0);
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -241,34 +271,36 @@
// buffer
int tableSize = (int) (numPartitions * recordsPerFrame * factor);
for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
- RunFileWriter rWriter = rWriters[partitionid];
- RunFileWriter sWriter = sWriters[partitionid];
- if (rWriter == null || sWriter == null) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
- new FrameTuplePairComparator(keys0, keys1, comparators));
+ new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
// build
- RunFileReader rReader = rWriter.createReader();
- rReader.open();
- while (rReader.nextFrame(buffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(buffer, copyBuffer);
- joiner.build(copyBuffer);
- buffer.clear();
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(buffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ joiner.build(copyBuffer);
+ buffer.clear();
+ }
+ buildReader.close();
}
- rReader.close();
// probe
- RunFileReader sReader = sWriter.createReader();
- sReader.open();
- while (sReader.nextFrame(buffer)) {
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(buffer)) {
joiner.join(buffer, writer);
buffer.clear();
}
- sReader.close();
+ probeReader.close();
joiner.closeJoin(writer);
}
writer.close();
@@ -276,8 +308,8 @@
@Override
public void deinitialize() throws HyracksDataException {
- env.set(LARGERELATION, null);
- env.set(SMALLRELATION, null);
+ env.set(RELATION1, null);
+ env.set(RELATION0, null);
}
};
return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index d7c1086..516bb88 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,19 +48,21 @@
public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String JOINER0 = "joiner0";
- private static final String SMALLRELATION = "RelR";
- private static final String LARGERELATION = "RelS";
+ private static final String BUILDRELATION = "BuildRel";
+ private static final String PROBERELATION = "ProbeRel";
private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
private final int memsize;
private static final long serialVersionUID = 1L;
private final int inputsize0;
private final double factor;
+ private final int recordsPerFrame;
private final int[] keys0;
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
- private final int recordsPerFrame;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
/**
* @param spec
@@ -88,19 +92,39 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ public HybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.factor = factor;
+ this.recordsPerFrame = recordsPerFrame;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
}
@Override
public void contributeTaskGraph(IActivityGraphBuilder builder) {
- BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(SMALLRELATION);
- PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(LARGERELATION);
+ BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(BUILDRELATION);
+ PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(PROBERELATION);
builder.addTask(phase1);
- builder.addSourceEdge(0, phase1, 0);
+ builder.addSourceEdge(1, phase1, 0);
builder.addTask(phase2);
- builder.addSourceEdge(1, phase2, 0);
+ builder.addSourceEdge(0, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
@@ -127,12 +151,18 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner0;
- private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
- ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
- .createPartitioner();
+ private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+ private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
+ hashFunctionFactories).createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
private ByteBuffer[] bufferForPartitions;
@@ -148,8 +178,8 @@
for (int i = 0; i < B; i++) {
ByteBuffer buf = bufferForPartitions[i];
- accessor0.reset(buf);
- if (accessor0.getTupleCount() > 0) {
+ accessorBuild.reset(buf);
+ if (accessorBuild.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
@@ -165,17 +195,17 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (memoryForHashtable != memsize - 2) {
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
+ accessorBuild.reset(buffer);
+ int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
int entry = -1;
if (memoryForHashtable == 0) {
- entry = hpc0.partition(accessor0, i, B);
+ entry = hpcBuild.partition(accessorBuild, i, B);
boolean newBuffer = false;
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
- if (appender.append(accessor0, i)) {
+ if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi);
@@ -184,15 +214,16 @@
}
}
} else {
- entry = hpc0.partition(accessor0, i, (int) (inputsize0 * factor / nPartitions));
+ entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
if (entry < memoryForHashtable) {
while (true) {
- if (!ftappender.append(accessor0, i)) {
+ if (!ftappender.append(accessorBuild, i)) {
build(inBuffer);
ftappender.reset(inBuffer, true);
- } else
+ } else {
break;
+ }
}
} else {
entry %= B;
@@ -200,7 +231,7 @@
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
- if (appender.append(accessor0, i)) {
+ if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi);
@@ -255,7 +286,7 @@
int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators));
+ keys0, keys1, comparators), isLeftOuter, nullWriters1);
bufferForPartitions = new ByteBuffer[B];
fWriters = new RunFileWriter[B];
for (int i = 0; i < B; i++) {
@@ -298,11 +329,11 @@
private class PartitionAndJoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- private String largeRelation;
+ private String relationName;
public PartitionAndJoinActivityNode(String relationName) {
super();
- this.largeRelation = relationName;
+ this.relationName = relationName;
}
@Override
@@ -315,22 +346,28 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private InMemoryHashJoin joiner0;
- private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
- private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+ private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+ private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
hashFunctionFactories);
- private ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+ private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
hashFunctionFactories);
- ITuplePartitionComputer hpc1 = hpcf1.createPartitioner();
+ private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
private final ByteBuffer inBuffer = ctx.allocateFrame();
private final ByteBuffer outBuffer = ctx.allocateFrame();
- private RunFileWriter[] rWriters;
- private RunFileWriter[] sWriters;
+ private RunFileWriter[] buildWriters;
+ private RunFileWriter[] probeWriters;
private ByteBuffer[] bufferForPartitions;
private int B;
private int memoryForHashtable;
@@ -339,10 +376,10 @@
public void open() throws HyracksDataException {
joiner0 = (InMemoryHashJoin) env.get(JOINER0);
writer.open();
- rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
+ buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
B = (Integer) env.get(NUM_PARTITION);
memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
- sWriters = new RunFileWriter[B];
+ probeWriters = new RunFileWriter[B];
bufferForPartitions = new ByteBuffer[B];
for (int i = 0; i < B; i++) {
bufferForPartitions[i] = ctx.allocateFrame();
@@ -354,18 +391,18 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (memoryForHashtable != memsize - 2) {
- accessor1.reset(buffer);
- int tupleCount1 = accessor1.getTupleCount();
- for (int i = 0; i < tupleCount1; ++i) {
+ accessorProbe.reset(buffer);
+ int tupleCount0 = accessorProbe.getTupleCount();
+ for (int i = 0; i < tupleCount0; ++i) {
int entry = -1;
if (memoryForHashtable == 0) {
- entry = hpc1.partition(accessor1, i, B);
+ entry = hpcProbe.partition(accessorProbe, i, B);
boolean newBuffer = false;
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
- if (appender.append(accessor1, i)) {
+ if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf);
@@ -374,10 +411,10 @@
}
}
} else {
- entry = hpc1.partition(accessor1, i, (int) (inputsize0 * factor / nPartitions));
+ entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
if (entry < memoryForHashtable) {
while (true) {
- if (!ftap.append(accessor1, i)) {
+ if (!ftap.append(accessorProbe, i)) {
joiner0.join(inBuffer, writer);
ftap.reset(inBuffer, true);
} else
@@ -390,7 +427,7 @@
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
- if (appender.append(accessor1, i)) {
+ if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf);
@@ -415,8 +452,8 @@
if (memoryForHashtable != memsize - 2) {
for (int i = 0; i < B; i++) {
ByteBuffer buf = bufferForPartitions[i];
- accessor1.reset(buf);
- if (accessor1.getTupleCount() > 0) {
+ accessorProbe.reset(buf);
+ if (accessorProbe.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
@@ -430,40 +467,43 @@
tableSize = (int) (memsize * recordsPerFrame * factor);
}
for (int partitionid = 0; partitionid < B; partitionid++) {
- RunFileWriter rWriter = rWriters[partitionid];
- RunFileWriter sWriter = sWriters[partitionid];
- if (rWriter == null || sWriter == null) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
- hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators));
+ hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+ nullWriters1);
- RunFileReader rReader = rWriter.createReader();
- rReader.open();
- while (rReader.nextFrame(inBuffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(inBuffer, copyBuffer);
- joiner.build(copyBuffer);
- inBuffer.clear();
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(inBuffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(inBuffer, copyBuffer);
+ joiner.build(copyBuffer);
+ inBuffer.clear();
+ }
+ buildReader.close();
}
- rReader.close();
// probe
- RunFileReader sReader = sWriter.createReader();
- sReader.open();
- while (sReader.nextFrame(inBuffer)) {
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(inBuffer)) {
joiner.join(inBuffer, writer);
inBuffer.clear();
}
- sReader.close();
+ probeReader.close();
joiner.closeJoin(writer);
}
}
writer.close();
- env.set(LARGERELATION, null);
- env.set(SMALLRELATION, null);
+ env.set(PROBERELATION, null);
+ env.set(BUILDRELATION, null);
env.set(JOINER0, null);
env.set(MEM_HASHTABLE, null);
env.set(NUM_PARTITION, null);
@@ -475,19 +515,19 @@
}
private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = sWriters[i];
+ RunFileWriter writer = probeWriters[i];
if (writer != null) {
writer.close();
}
}
private void write(int i, ByteBuffer head) throws HyracksDataException {
- RunFileWriter writer = sWriters[i];
+ RunFileWriter writer = probeWriters[i];
if (writer == null) {
- FileReference file = ctx.createWorkspaceFile(largeRelation);
+ FileReference file = ctx.createWorkspaceFile(relationName);
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
- sWriters[i] = writer;
+ probeWriters[i] = writer;
}
writer.nextFrame(head);
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 4d98c6a..94a9501 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -21,8 +22,10 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -30,36 +33,51 @@
public class InMemoryHashJoin {
private final Link[] table;
private final List<ByteBuffer> buffers;
- private final FrameTupleAccessor accessor0;
- private final ITuplePartitionComputer tpc0;
- private final FrameTupleAccessor accessor1;
- private final ITuplePartitionComputer tpc1;
+ private final FrameTupleAccessor accessorBuild;
+ private final ITuplePartitionComputer tpcBuild;
+ private final FrameTupleAccessor accessorProbe;
+ private final ITuplePartitionComputer tpcProbe;
private final FrameTupleAppender appender;
private final FrameTuplePairComparator tpComparator;
private final ByteBuffer outBuffer;
-
+ private final boolean isLeftOuter;
+ private final ArrayTupleBuilder nullTupleBuild;
+
public InMemoryHashJoin(IHyracksStageletContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
- FrameTuplePairComparator comparator) {
+ FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+ throws HyracksDataException {
table = new Link[tableSize];
buffers = new ArrayList<ByteBuffer>();
- this.accessor0 = accessor0;
- this.tpc0 = tpc0;
- this.accessor1 = accessor1;
- this.tpc1 = tpc1;
+ this.accessorBuild = accessor1;
+ this.tpcBuild = tpc1;
+ this.accessorProbe = accessor0;
+ this.tpcProbe = tpc0;
appender = new FrameTupleAppender(ctx.getFrameSize());
tpComparator = comparator;
outBuffer = ctx.allocateFrame();
appender.reset(outBuffer, true);
+ this.isLeftOuter = isLeftOuter;
+ if (isLeftOuter) {
+ int fieldCountOuter = accessor1.getFieldCount();
+ nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
+ DataOutput out = nullTupleBuild.getDataOutput();
+ for (int i = 0; i < fieldCountOuter; i++) {
+ nullWriters1[i].writeNull(out);
+ nullTupleBuild.addFieldEndOffset();
+ }
+ } else {
+ nullTupleBuild = null;
+ }
}
public void build(ByteBuffer buffer) throws HyracksDataException {
buffers.add(buffer);
int bIndex = buffers.size() - 1;
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
+ accessorBuild.reset(buffer);
+ int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
- int entry = tpc0.partition(accessor0, i, table.length);
+ int entry = tpcBuild.partition(accessorBuild, i, table.length);
long tPointer = (((long) bIndex) << 32) + i;
Link link = table[entry];
if (link == null) {
@@ -70,29 +88,41 @@
}
public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
- accessor1.reset(buffer);
- int tupleCount1 = accessor1.getTupleCount();
- for (int i = 0; i < tupleCount1; ++i) {
- int entry = tpc1.partition(accessor1, i, table.length);
+ accessorProbe.reset(buffer);
+ int tupleCount0 = accessorProbe.getTupleCount();
+ for (int i = 0; i < tupleCount0; ++i) {
+ int entry = tpcProbe.partition(accessorProbe, i, table.length);
Link link = table[entry];
+ boolean matchFound = false;
if (link != null) {
for (int j = 0; j < link.size; ++j) {
long pointer = link.pointers[j];
int bIndex = (int) ((pointer >> 32) & 0xffffffff);
int tIndex = (int) (pointer & 0xffffffff);
- accessor0.reset(buffers.get(bIndex));
- int c = tpComparator.compare(accessor0, tIndex, accessor1, i);
+ accessorBuild.reset(buffers.get(bIndex));
+ int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
- if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+ matchFound = true;
+ if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+ if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
throw new IllegalStateException();
}
}
}
}
}
+ if (!matchFound && isLeftOuter) {
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
+ .getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 74f0146..13af25d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -46,6 +48,8 @@
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
private final int tableSize;
public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
@@ -56,8 +60,25 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
- this.tableSize = tableSize;
recordDescriptors[0] = recordDescriptor;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ this.tableSize = tableSize;
+ }
+
+ public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+ int tableSize) {
+ super(spec, 2, 1);
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ recordDescriptors[0] = recordDescriptor;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
+ this.tableSize = tableSize;
}
@Override
@@ -66,10 +87,11 @@
HashProbeActivityNode hpa = new HashProbeActivityNode();
builder.addTask(hba);
- builder.addSourceEdge(0, hba, 0);
+ builder.addSourceEdge(1, hba, 0);
builder.addTask(hpa);
- builder.addSourceEdge(1, hpa, 0);
+ builder.addSourceEdge(0, hpa, 0);
+
builder.addTargetEdge(0, hpa, 0);
builder.addBlockingEdge(hba, hpa);
@@ -88,6 +110,13 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner;
@@ -99,7 +128,7 @@
.createPartitioner();
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators));
+ keys0, keys1, comparators), isLeftOuter, nullWriters1);
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
new file mode 100644
index 0000000..5ee8fd5
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+
+public class NestedLoopJoin {
+ private final FrameTupleAccessor accessorInner;
+ private final FrameTupleAccessor accessorOuter;
+ private final FrameTupleAppender appender;
+ private final ITuplePairComparator tpComparator;
+ private final ByteBuffer outBuffer;
+ private final ByteBuffer innerBuffer;
+ private final List<ByteBuffer> outBuffers;
+ private final int memSize;
+ private final IHyracksStageletContext ctx;
+ private RunFileReader runFileReader;
+ private int currentMemSize = 0;
+ private final RunFileWriter runFileWriter;
+
+ public NestedLoopJoin(IHyracksStageletContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
+ ITuplePairComparator comparators, int memSize) throws HyracksDataException {
+ this.accessorInner = accessor1;
+ this.accessorOuter = accessor0;
+ this.appender = new FrameTupleAppender(ctx.getFrameSize());
+ this.tpComparator = comparators;
+ this.outBuffer = ctx.allocateFrame();
+ this.innerBuffer = ctx.allocateFrame();
+ this.appender.reset(outBuffer, true);
+ this.outBuffers = new ArrayList<ByteBuffer>();
+ this.memSize = memSize;
+ this.ctx = ctx;
+
+ FileReference file = ctx.getJobletContext().createWorkspaceFile(
+ this.getClass().getSimpleName() + this.toString());
+ runFileWriter = new RunFileWriter(file, ctx.getIOManager());
+ runFileWriter.open();
+ }
+
+ public void cache(ByteBuffer buffer) throws HyracksDataException {
+ runFileWriter.nextFrame(buffer);
+ System.out.println(runFileWriter.getFileSize());
+ }
+
+ public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException {
+ if (outBuffers.size() < memSize - 3) {
+ createAndCopyFrame(outerBuffer);
+ return;
+ }
+ if (currentMemSize < memSize - 3) {
+ reloadFrame(outerBuffer);
+ return;
+ }
+ for (ByteBuffer outBuffer : outBuffers) {
+ runFileReader = runFileWriter.createReader();
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ blockJoin(outBuffer, innerBuffer, writer);
+ }
+ runFileReader.close();
+ }
+ currentMemSize = 0;
+ reloadFrame(outerBuffer);
+ }
+
+ private void createAndCopyFrame(ByteBuffer outerBuffer) {
+ ByteBuffer outerBufferCopy = ctx.allocateFrame();
+ FrameUtils.copy(outerBuffer, outerBufferCopy);
+ outBuffers.add(outerBufferCopy);
+ currentMemSize++;
+ }
+
+ private void reloadFrame(ByteBuffer outerBuffer) {
+ outBuffers.get(currentMemSize).clear();
+ FrameUtils.copy(outerBuffer, outBuffers.get(currentMemSize));
+ currentMemSize++;
+ }
+
+ private void blockJoin(ByteBuffer outerBuffer, ByteBuffer innerBuffer, IFrameWriter writer)
+ throws HyracksDataException {
+ accessorOuter.reset(outerBuffer);
+ accessorInner.reset(innerBuffer);
+ int tupleCount0 = accessorOuter.getTupleCount();
+ int tupleCount1 = accessorInner.getTupleCount();
+
+ for (int i = 0; i < tupleCount0; ++i) {
+ for (int j = 0; j < tupleCount1; ++j) {
+ int c = compare(accessorOuter, i, accessorInner, j);
+ if (c == 0) {
+ if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void closeCache() throws HyracksDataException {
+ if (runFileWriter != null) {
+ runFileWriter.close();
+ }
+ }
+
+ public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+ for (ByteBuffer outBuffer : outBuffers) {
+ runFileReader = runFileWriter.createReader();
+ runFileReader.open();
+ while (runFileReader.nextFrame(innerBuffer)) {
+ blockJoin(outBuffer, innerBuffer, writer);
+ }
+ runFileReader.close();
+ }
+ outBuffers.clear();
+ currentMemSize = 0;
+
+ if (appender.getTupleCount() > 0) {
+ flushFrame(outBuffer, writer);
+ }
+ }
+
+ private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ writer.nextFrame(buffer);
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ }
+
+ private int compare(FrameTupleAccessor accessor0, int tIndex0, FrameTupleAccessor accessor1, int tIndex1)
+ throws HyracksDataException {
+ int c = tpComparator.compare(accessor0, tIndex0, accessor1, tIndex1);
+ if (c != 0) {
+ return c;
+ }
+ return 0;
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
new file mode 100644
index 0000000..1436e30
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+public class NestedLoopJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final String JOINER = "joiner";
+
+ private static final long serialVersionUID = 1L;
+ private final ITuplePairComparatorFactory comparatorFactory;
+ private final int memSize;
+
+ public NestedLoopJoinOperatorDescriptor(JobSpecification spec, ITuplePairComparatorFactory comparatorFactory,
+ RecordDescriptor recordDescriptor, int memSize) {
+ super(spec, 2, 1);
+ this.comparatorFactory = comparatorFactory;
+ this.recordDescriptors[0] = recordDescriptor;
+ this.memSize = memSize;
+ }
+
+ @Override
+ public void contributeTaskGraph(IActivityGraphBuilder builder) {
+ JoinCacheActivityNode jc = new JoinCacheActivityNode();
+ NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode();
+
+ builder.addTask(jc);
+ builder.addSourceEdge(1, jc, 0);
+
+ builder.addTask(nlj);
+ builder.addSourceEdge(0, nlj, 0);
+
+ builder.addTargetEdge(0, nlj, 0);
+ builder.addBlockingEdge(jc, nlj);
+ }
+
+ private class JoinCacheActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+ final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator();
+
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private NestedLoopJoin joiner;
+
+ @Override
+ public void open() throws HyracksDataException {
+ joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+ new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ copyBuffer.flip();
+ joiner.cache(copyBuffer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ joiner.closeCache();
+ env.set(JOINER, joiner);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+ };
+ return op;
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return NestedLoopJoinOperatorDescriptor.this;
+ }
+ }
+
+ private class NestedLoopJoinActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx,
+ final IOperatorEnvironment env, IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+
+ IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private NestedLoopJoin joiner;
+
+ @Override
+ public void open() throws HyracksDataException {
+ joiner = (NestedLoopJoin) env.get(JOINER);
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ joiner.join(buffer, writer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ joiner.closeJoin(writer);
+ writer.close();
+ env.set(JOINER, null);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
+ };
+ return op;
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return NestedLoopJoinOperatorDescriptor.this;
+ }
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..f82a7ef
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public SplitOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int outputArity) {
+ super(spec, 1, outputArity);
+ for (int i = 0; i < outputArity; i++) {
+ recordDescriptors[i] = rDesc;
+ }
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputOperatorNodePushable() {
+ private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.close();
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ FrameUtils.flushFrame(bufferAccessor, writer);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.open();
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+ };
+ }
+}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 603b194..3742e91 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -240,12 +240,12 @@
fta2.reset(buf2);
for (int f = 0; f < comparators.length; ++f) {
int fIdx = sortFields[f];
- int f1Start = fIdx == 0 ? 0 : buf1.getShort(j1 + (fIdx - 1) * 2);
- int f1End = buf1.getShort(j1 + fIdx * 2);
+ int f1Start = fIdx == 0 ? 0 : buf1.getInt(j1 + (fIdx - 1) * 4);
+ int f1End = buf1.getInt(j1 + fIdx * 4);
int s1 = j1 + fta1.getFieldSlotsLength() + f1Start;
int l1 = f1End - f1Start;
- int f2Start = fIdx == 0 ? 0 : buf2.getShort(j2 + (fIdx - 1) * 2);
- int f2End = buf2.getShort(j2 + fIdx * 2);
+ int f2Start = fIdx == 0 ? 0 : buf2.getInt(j2 + (fIdx - 1) * 4);
+ int f2End = buf2.getInt(j2 + fIdx * 4);
int s2 = j2 + fta2.getFieldSlotsLength() + f2Start;
int l2 = f2End - f2Start;
int c = comparators[f].compare(b1, s1, l1, b2, s2, l2);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
new file mode 100644
index 0000000..eccb945
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.union;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+
+public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
+ public UnionAllOperatorDescriptor(JobSpecification spec, int nInputs, RecordDescriptor recordDescriptor) {
+ super(spec, nInputs, 1);
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void contributeTaskGraph(IActivityGraphBuilder builder) {
+ UnionActivityNode uba = new UnionActivityNode();
+ builder.addTask(uba);
+ for (int i = 0; i < inputArity; ++i) {
+ builder.addSourceEdge(i, uba, i);
+ }
+ builder.addTargetEdge(0, uba, 0);
+ }
+
+ private class UnionActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public UnionActivityNode() {
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksStageletContext ctx, IOperatorEnvironment env,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ return new UnionOperator(ctx, inRecordDesc);
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return UnionAllOperatorDescriptor.this;
+ }
+
+ }
+
+ private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
+ private int nOpened;
+
+ private int nClosed;
+
+ public UnionOperator(IHyracksStageletContext ctx, RecordDescriptor inRecordDesc) {
+ nOpened = 0;
+ nClosed = 0;
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ if (++nOpened == 1) {
+ writer.open();
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ writer.nextFrame(buffer);
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ writer.flush();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ synchronized (UnionOperator.this) {
+ if (++nClosed == inputArity) {
+ writer.close();
+ }
+ }
+ }
+ };
+ }
+ }
+}
\ No newline at end of file