Merged in all changes from Aqua/Asterix
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_aqua_changes@405 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 4ec1b6f..7647e50 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -118,30 +118,32 @@
return false;
}
- public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots, byte[] bytes, int offset,
- int length) {
+ public boolean appendConcat(IFrameTupleAccessor accessor0, int tIndex0, int[] fieldSlots1, byte[] bytes1, int offset1,
+ int dataLen1) {
int startOffset0 = accessor0.getTupleStartOffset(tIndex0);
int endOffset0 = accessor0.getTupleEndOffset(tIndex0);
int length0 = endOffset0 - startOffset0;
- if (tupleDataEndOffset + length0 + fieldSlots.length * 4 + length + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ int slotsLen1 = fieldSlots1.length * 4;
+ int length1 = slotsLen1 + dataLen1;
+
+ if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
ByteBuffer src0 = accessor0.getBuffer();
int slotsLen0 = accessor0.getFieldSlotsLength();
int dataLen0 = length0 - slotsLen0;
// Copy slots from accessor0 verbatim
System.arraycopy(src0.array(), startOffset0, buffer.array(), tupleDataEndOffset, slotsLen0);
- // Copy slots from fieldSlots with the following transformation:
- // newSlotIdx = oldSlotIdx + dataLen0
- for (int i = 0; i < fieldSlots.length; ++i) {
- buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots[i] + dataLen0));
+ // Copy fieldSlots1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+ for (int i = 0; i < fieldSlots1.length; ++i) {
+ buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, (fieldSlots1[i] + dataLen0));
}
// Copy data0
System.arraycopy(src0.array(), startOffset0 + slotsLen0, buffer.array(), tupleDataEndOffset + slotsLen0
- + fieldSlots.length * 4, dataLen0);
- // Copy bytes
- System.arraycopy(bytes, offset, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots.length * 4
- + dataLen0, length);
- tupleDataEndOffset += (length0 + fieldSlots.length * 4 + length);
+ + slotsLen1, dataLen0);
+ // Copy bytes1
+ System.arraycopy(bytes1, offset1, buffer.array(), tupleDataEndOffset + slotsLen0 + fieldSlots1.length * 4
+ + dataLen0, dataLen1);
+ tupleDataEndOffset += (length0 + length1);
buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
++tupleCount;
buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
@@ -150,6 +152,41 @@
return false;
}
+ public boolean appendConcat(int[] fieldSlots0, byte[] bytes0, int offset0, int dataLen0, IFrameTupleAccessor accessor1,
+ int tIndex1) {
+ int slotsLen0 = fieldSlots0.length * 4;
+ int length0 = slotsLen0 + dataLen0;
+
+ int startOffset1 = accessor1.getTupleStartOffset(tIndex1);
+ int endOffset1 = accessor1.getTupleEndOffset(tIndex1);
+ int length1 = endOffset1 - startOffset1;
+
+ if (tupleDataEndOffset + length0 + length1 + 4 + (tupleCount + 1) * 4 <= frameSize) {
+ ByteBuffer src1 = accessor1.getBuffer();
+ int slotsLen1 = accessor1.getFieldSlotsLength();
+ int dataLen1 = length1 - slotsLen1;
+ // Copy fieldSlots0 verbatim
+ for (int i = 0; i < fieldSlots0.length; ++i) {
+ buffer.putInt(tupleDataEndOffset + i * 4, fieldSlots0[i]);
+ }
+ // Copy slots from accessor1 with the following transformation: newSlotIdx = oldSlotIdx + dataLen0
+ for (int i = 0; i < slotsLen1 / 4; ++i) {
+ buffer.putInt(tupleDataEndOffset + slotsLen0 + i * 4, src1.getInt(startOffset1 + i * 4) + dataLen0);
+ }
+ // Copy bytes0
+ System.arraycopy(bytes0, offset0, buffer.array(), tupleDataEndOffset + slotsLen0 + slotsLen1,
+ dataLen0);
+ // Copy data1
+ System.arraycopy(src1.array(), startOffset1 + slotsLen1, buffer.array(), tupleDataEndOffset + slotsLen0
+ + slotsLen1 + dataLen0, dataLen1);
+ tupleDataEndOffset += (length0 + length1);
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize) - 4 * (tupleCount + 1), tupleDataEndOffset);
+ ++tupleCount;
+ buffer.putInt(FrameHelper.getTupleCountOffset(frameSize), tupleCount);
+ return true;
+ }
+ return false;
+ }
public boolean appendProjection(IFrameTupleAccessor accessor, int tIndex, int[] fields) {
int fTargetSlotsLength = fields.length * 4;
int length = fTargetSlotsLength;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index bff101e..1a69952 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -44,8 +46,8 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final String SMALLRELATION = "RelR";
- private static final String LARGERELATION = "RelS";
+ private static final String RELATION0 = "Rel0";
+ private static final String RELATION1 = "Rel1";
private static final long serialVersionUID = 1L;
private final int[] keys0;
@@ -56,6 +58,8 @@
private final double factor;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
@@ -69,24 +73,44 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ public GraceHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.recordsPerFrame = recordsPerFrame;
+ this.factor = factor;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
}
@Override
public void contributeTaskGraph(IActivityGraphBuilder builder) {
- HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
- HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
+ HashPartitionActivityNode part0 = new HashPartitionActivityNode(RELATION0, keys0, 0);
+ HashPartitionActivityNode part1 = new HashPartitionActivityNode(RELATION1, keys1, 1);
JoinActivityNode join = new JoinActivityNode();
- builder.addTask(rpart);
- builder.addSourceEdge(0, rpart, 0);
+ builder.addTask(part0);
+ builder.addSourceEdge(0, part0, 0);
- builder.addTask(spart);
- builder.addSourceEdge(1, spart, 0);
+ builder.addTask(part1);
+ builder.addSourceEdge(1, part1, 0);
builder.addTask(join);
- builder.addBlockingEdge(rpart, spart);
- builder.addBlockingEdge(spart, join);
+ builder.addBlockingEdge(part0, part1);
+ builder.addBlockingEdge(part1, join);
builder.addTargetEdge(0, join, 0);
}
@@ -217,18 +241,24 @@
}
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
private InMemoryHashJoin joiner;
- private RunFileWriter[] rWriters;
- private RunFileWriter[] sWriters;
+ private RunFileWriter[] buildWriters;
+ private RunFileWriter[] probeWriters;
private final int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
@Override
public void initialize() throws HyracksDataException {
- rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
- sWriters = (RunFileWriter[]) env.get(LARGERELATION);
+ buildWriters = (RunFileWriter[]) env.get(RELATION1);
+ probeWriters = (RunFileWriter[]) env.get(RELATION0);
ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(numPartitions,
new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)).createPartitioner();
@@ -241,34 +271,36 @@
// buffer
int tableSize = (int) (numPartitions * recordsPerFrame * factor);
for (int partitionid = 0; partitionid < numPartitions; partitionid++) {
- RunFileWriter rWriter = rWriters[partitionid];
- RunFileWriter sWriter = sWriters[partitionid];
- if (rWriter == null || sWriter == null) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
- new FrameTuplePairComparator(keys0, keys1, comparators));
+ new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1);
// build
- RunFileReader rReader = rWriter.createReader();
- rReader.open();
- while (rReader.nextFrame(buffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(buffer, copyBuffer);
- joiner.build(copyBuffer);
- buffer.clear();
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(buffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ joiner.build(copyBuffer);
+ buffer.clear();
+ }
+ buildReader.close();
}
- rReader.close();
// probe
- RunFileReader sReader = sWriter.createReader();
- sReader.open();
- while (sReader.nextFrame(buffer)) {
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(buffer)) {
joiner.join(buffer, writer);
buffer.clear();
}
- sReader.close();
+ probeReader.close();
joiner.closeJoin(writer);
}
writer.close();
@@ -276,8 +308,8 @@
@Override
public void deinitialize() throws HyracksDataException {
- env.set(LARGERELATION, null);
- env.set(SMALLRELATION, null);
+ env.set(RELATION1, null);
+ env.set(RELATION0, null);
}
};
return op;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index d7c1086..516bb88 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -46,19 +48,21 @@
public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String JOINER0 = "joiner0";
- private static final String SMALLRELATION = "RelR";
- private static final String LARGERELATION = "RelS";
+ private static final String BUILDRELATION = "BuildRel";
+ private static final String PROBERELATION = "ProbeRel";
private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
private final int memsize;
private static final long serialVersionUID = 1L;
private final int inputsize0;
private final double factor;
+ private final int recordsPerFrame;
private final int[] keys0;
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
- private final int recordsPerFrame;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
/**
* @param spec
@@ -88,19 +92,39 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ public HybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.factor = factor;
+ this.recordsPerFrame = recordsPerFrame;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
}
@Override
public void contributeTaskGraph(IActivityGraphBuilder builder) {
- BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(SMALLRELATION);
- PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(LARGERELATION);
+ BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(BUILDRELATION);
+ PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(PROBERELATION);
builder.addTask(phase1);
- builder.addSourceEdge(0, phase1, 0);
+ builder.addSourceEdge(1, phase1, 0);
builder.addTask(phase2);
- builder.addSourceEdge(1, phase2, 0);
+ builder.addSourceEdge(0, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
@@ -127,12 +151,18 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner0;
- private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
- ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
- .createPartitioner();
+ private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+ private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
+ hashFunctionFactories).createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
private ByteBuffer[] bufferForPartitions;
@@ -148,8 +178,8 @@
for (int i = 0; i < B; i++) {
ByteBuffer buf = bufferForPartitions[i];
- accessor0.reset(buf);
- if (accessor0.getTupleCount() > 0) {
+ accessorBuild.reset(buf);
+ if (accessorBuild.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
@@ -165,17 +195,17 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (memoryForHashtable != memsize - 2) {
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
+ accessorBuild.reset(buffer);
+ int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
int entry = -1;
if (memoryForHashtable == 0) {
- entry = hpc0.partition(accessor0, i, B);
+ entry = hpcBuild.partition(accessorBuild, i, B);
boolean newBuffer = false;
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
- if (appender.append(accessor0, i)) {
+ if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi);
@@ -184,15 +214,16 @@
}
}
} else {
- entry = hpc0.partition(accessor0, i, (int) (inputsize0 * factor / nPartitions));
+ entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
if (entry < memoryForHashtable) {
while (true) {
- if (!ftappender.append(accessor0, i)) {
+ if (!ftappender.append(accessorBuild, i)) {
build(inBuffer);
ftappender.reset(inBuffer, true);
- } else
+ } else {
break;
+ }
}
} else {
entry %= B;
@@ -200,7 +231,7 @@
ByteBuffer bufBi = bufferForPartitions[entry];
while (true) {
appender.reset(bufBi, newBuffer);
- if (appender.append(accessor0, i)) {
+ if (appender.append(accessorBuild, i)) {
break;
} else {
write(entry, bufBi);
@@ -255,7 +286,7 @@
int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators));
+ keys0, keys1, comparators), isLeftOuter, nullWriters1);
bufferForPartitions = new ByteBuffer[B];
fWriters = new RunFileWriter[B];
for (int i = 0; i < B; i++) {
@@ -298,11 +329,11 @@
private class PartitionAndJoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- private String largeRelation;
+ private String relationName;
public PartitionAndJoinActivityNode(String relationName) {
super();
- this.largeRelation = relationName;
+ this.relationName = relationName;
}
@Override
@@ -315,22 +346,28 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private InMemoryHashJoin joiner0;
- private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
- private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+ private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+ private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
hashFunctionFactories);
- private ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+ private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
hashFunctionFactories);
- ITuplePartitionComputer hpc1 = hpcf1.createPartitioner();
+ private final ITuplePartitionComputer hpcProbe = hpcf0.createPartitioner();
private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
private final ByteBuffer inBuffer = ctx.allocateFrame();
private final ByteBuffer outBuffer = ctx.allocateFrame();
- private RunFileWriter[] rWriters;
- private RunFileWriter[] sWriters;
+ private RunFileWriter[] buildWriters;
+ private RunFileWriter[] probeWriters;
private ByteBuffer[] bufferForPartitions;
private int B;
private int memoryForHashtable;
@@ -339,10 +376,10 @@
public void open() throws HyracksDataException {
joiner0 = (InMemoryHashJoin) env.get(JOINER0);
writer.open();
- rWriters = (RunFileWriter[]) env.get(SMALLRELATION);
+ buildWriters = (RunFileWriter[]) env.get(BUILDRELATION);
B = (Integer) env.get(NUM_PARTITION);
memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
- sWriters = new RunFileWriter[B];
+ probeWriters = new RunFileWriter[B];
bufferForPartitions = new ByteBuffer[B];
for (int i = 0; i < B; i++) {
bufferForPartitions[i] = ctx.allocateFrame();
@@ -354,18 +391,18 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (memoryForHashtable != memsize - 2) {
- accessor1.reset(buffer);
- int tupleCount1 = accessor1.getTupleCount();
- for (int i = 0; i < tupleCount1; ++i) {
+ accessorProbe.reset(buffer);
+ int tupleCount0 = accessorProbe.getTupleCount();
+ for (int i = 0; i < tupleCount0; ++i) {
int entry = -1;
if (memoryForHashtable == 0) {
- entry = hpc1.partition(accessor1, i, B);
+ entry = hpcProbe.partition(accessorProbe, i, B);
boolean newBuffer = false;
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
- if (appender.append(accessor1, i)) {
+ if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf);
@@ -374,10 +411,10 @@
}
}
} else {
- entry = hpc1.partition(accessor1, i, (int) (inputsize0 * factor / nPartitions));
+ entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
if (entry < memoryForHashtable) {
while (true) {
- if (!ftap.append(accessor1, i)) {
+ if (!ftap.append(accessorProbe, i)) {
joiner0.join(inBuffer, writer);
ftap.reset(inBuffer, true);
} else
@@ -390,7 +427,7 @@
ByteBuffer outbuf = bufferForPartitions[entry];
while (true) {
appender.reset(outbuf, newBuffer);
- if (appender.append(accessor1, i)) {
+ if (appender.append(accessorProbe, i)) {
break;
} else {
write(entry, outbuf);
@@ -415,8 +452,8 @@
if (memoryForHashtable != memsize - 2) {
for (int i = 0; i < B; i++) {
ByteBuffer buf = bufferForPartitions[i];
- accessor1.reset(buf);
- if (accessor1.getTupleCount() > 0) {
+ accessorProbe.reset(buf);
+ if (accessorProbe.getTupleCount() > 0) {
write(i, buf);
}
closeWriter(i);
@@ -430,40 +467,43 @@
tableSize = (int) (memsize * recordsPerFrame * factor);
}
for (int partitionid = 0; partitionid < B; partitionid++) {
- RunFileWriter rWriter = rWriters[partitionid];
- RunFileWriter sWriter = sWriters[partitionid];
- if (rWriter == null || sWriter == null) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
continue;
}
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
- hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators));
+ hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+ nullWriters1);
- RunFileReader rReader = rWriter.createReader();
- rReader.open();
- while (rReader.nextFrame(inBuffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame();
- FrameUtils.copy(inBuffer, copyBuffer);
- joiner.build(copyBuffer);
- inBuffer.clear();
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(inBuffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(inBuffer, copyBuffer);
+ joiner.build(copyBuffer);
+ inBuffer.clear();
+ }
+ buildReader.close();
}
- rReader.close();
// probe
- RunFileReader sReader = sWriter.createReader();
- sReader.open();
- while (sReader.nextFrame(inBuffer)) {
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(inBuffer)) {
joiner.join(inBuffer, writer);
inBuffer.clear();
}
- sReader.close();
+ probeReader.close();
joiner.closeJoin(writer);
}
}
writer.close();
- env.set(LARGERELATION, null);
- env.set(SMALLRELATION, null);
+ env.set(PROBERELATION, null);
+ env.set(BUILDRELATION, null);
env.set(JOINER0, null);
env.set(MEM_HASHTABLE, null);
env.set(NUM_PARTITION, null);
@@ -475,19 +515,19 @@
}
private void closeWriter(int i) throws HyracksDataException {
- RunFileWriter writer = sWriters[i];
+ RunFileWriter writer = probeWriters[i];
if (writer != null) {
writer.close();
}
}
private void write(int i, ByteBuffer head) throws HyracksDataException {
- RunFileWriter writer = sWriters[i];
+ RunFileWriter writer = probeWriters[i];
if (writer == null) {
- FileReference file = ctx.createWorkspaceFile(largeRelation);
+ FileReference file = ctx.createWorkspaceFile(relationName);
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
- sWriters[i] = writer;
+ probeWriters[i] = writer;
}
writer.nextFrame(head);
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index 4d98c6a..94a9501 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.dataflow.std.join;
+import java.io.DataOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -21,8 +22,10 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
@@ -30,36 +33,51 @@
public class InMemoryHashJoin {
private final Link[] table;
private final List<ByteBuffer> buffers;
- private final FrameTupleAccessor accessor0;
- private final ITuplePartitionComputer tpc0;
- private final FrameTupleAccessor accessor1;
- private final ITuplePartitionComputer tpc1;
+ private final FrameTupleAccessor accessorBuild;
+ private final ITuplePartitionComputer tpcBuild;
+ private final FrameTupleAccessor accessorProbe;
+ private final ITuplePartitionComputer tpcProbe;
private final FrameTupleAppender appender;
private final FrameTuplePairComparator tpComparator;
private final ByteBuffer outBuffer;
-
+ private final boolean isLeftOuter;
+ private final ArrayTupleBuilder nullTupleBuild;
+
public InMemoryHashJoin(IHyracksStageletContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
- FrameTuplePairComparator comparator) {
+ FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1)
+ throws HyracksDataException {
table = new Link[tableSize];
buffers = new ArrayList<ByteBuffer>();
- this.accessor0 = accessor0;
- this.tpc0 = tpc0;
- this.accessor1 = accessor1;
- this.tpc1 = tpc1;
+ this.accessorBuild = accessor1;
+ this.tpcBuild = tpc1;
+ this.accessorProbe = accessor0;
+ this.tpcProbe = tpc0;
appender = new FrameTupleAppender(ctx.getFrameSize());
tpComparator = comparator;
outBuffer = ctx.allocateFrame();
appender.reset(outBuffer, true);
+ this.isLeftOuter = isLeftOuter;
+ if (isLeftOuter) {
+ int fieldCountOuter = accessor1.getFieldCount();
+ nullTupleBuild = new ArrayTupleBuilder(fieldCountOuter);
+ DataOutput out = nullTupleBuild.getDataOutput();
+ for (int i = 0; i < fieldCountOuter; i++) {
+ nullWriters1[i].writeNull(out);
+ nullTupleBuild.addFieldEndOffset();
+ }
+ } else {
+ nullTupleBuild = null;
+ }
}
public void build(ByteBuffer buffer) throws HyracksDataException {
buffers.add(buffer);
int bIndex = buffers.size() - 1;
- accessor0.reset(buffer);
- int tCount = accessor0.getTupleCount();
+ accessorBuild.reset(buffer);
+ int tCount = accessorBuild.getTupleCount();
for (int i = 0; i < tCount; ++i) {
- int entry = tpc0.partition(accessor0, i, table.length);
+ int entry = tpcBuild.partition(accessorBuild, i, table.length);
long tPointer = (((long) bIndex) << 32) + i;
Link link = table[entry];
if (link == null) {
@@ -70,29 +88,41 @@
}
public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
- accessor1.reset(buffer);
- int tupleCount1 = accessor1.getTupleCount();
- for (int i = 0; i < tupleCount1; ++i) {
- int entry = tpc1.partition(accessor1, i, table.length);
+ accessorProbe.reset(buffer);
+ int tupleCount0 = accessorProbe.getTupleCount();
+ for (int i = 0; i < tupleCount0; ++i) {
+ int entry = tpcProbe.partition(accessorProbe, i, table.length);
Link link = table[entry];
+ boolean matchFound = false;
if (link != null) {
for (int j = 0; j < link.size; ++j) {
long pointer = link.pointers[j];
int bIndex = (int) ((pointer >> 32) & 0xffffffff);
int tIndex = (int) (pointer & 0xffffffff);
- accessor0.reset(buffers.get(bIndex));
- int c = tpComparator.compare(accessor0, tIndex, accessor1, i);
+ accessorBuild.reset(buffers.get(bIndex));
+ int c = tpComparator.compare(accessorProbe, i, accessorBuild, tIndex);
if (c == 0) {
- if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+ matchFound = true;
+ if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
- if (!appender.appendConcat(accessor0, tIndex, accessor1, i)) {
+ if (!appender.appendConcat(accessorProbe, i, accessorBuild, tIndex)) {
throw new IllegalStateException();
}
}
}
}
}
+ if (!matchFound && isLeftOuter) {
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
+ flushFrame(outBuffer, writer);
+ appender.reset(outBuffer, true);
+ if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild
+ .getSize())) {
+ throw new IllegalStateException();
+ }
+ }
+ }
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 74f0146..13af25d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -23,6 +23,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -46,6 +48,8 @@
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
private final int tableSize;
public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
@@ -56,8 +60,25 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
- this.tableSize = tableSize;
recordDescriptors[0] = recordDescriptor;
+ this.isLeftOuter = false;
+ this.nullWriterFactories1 = null;
+ this.tableSize = tableSize;
+ }
+
+ public InMemoryHashJoinOperatorDescriptor(JobSpecification spec, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+ int tableSize) {
+ super(spec, 2, 1);
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ recordDescriptors[0] = recordDescriptor;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
+ this.tableSize = tableSize;
}
@Override
@@ -66,10 +87,11 @@
HashProbeActivityNode hpa = new HashProbeActivityNode();
builder.addTask(hba);
- builder.addSourceEdge(0, hba, 0);
+ builder.addSourceEdge(1, hba, 0);
builder.addTask(hpa);
- builder.addSourceEdge(1, hpa, 0);
+ builder.addSourceEdge(0, hpa, 0);
+
builder.addTargetEdge(0, hpa, 0);
builder.addBlockingEdge(hba, hpa);
@@ -88,6 +110,13 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if (isLeftOuter) {
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private InMemoryHashJoin joiner;
@@ -99,7 +128,7 @@
.createPartitioner();
joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
hpc0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(
- keys0, keys1, comparators));
+ keys0, keys1, comparators), isLeftOuter, nullWriters1);
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..f82a7ef
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -0,0 +1,65 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public SplitOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc, int outputArity) {
+ super(spec, 1, outputArity);
+ for (int i = 0; i < outputArity; i++) {
+ recordDescriptors[i] = rDesc;
+ }
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksStageletContext ctx, IOperatorEnvironment env,
+ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryInputOperatorNodePushable() {
+ private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+
+ @Override
+ public void close() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.close();
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ FrameUtils.flushFrame(bufferAccessor, writer);
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ for (IFrameWriter writer : writers) {
+ writer.open();
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+ };
+ }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 4958a35..26b3dab 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.tests.integration;
+import java.io.DataOutput;
import java.io.File;
import org.junit.Test;
@@ -23,8 +24,11 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.comparators.UTF8StringBinaryComparatorFactory;
@@ -51,6 +55,24 @@
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
private static final boolean DEBUG = true;
+ static private class NoopNullWriterFactory implements INullWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+ public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
+
+ private NoopNullWriterFactory() {
+ }
+
+ @Override
+ public INullWriter createNullWriter() {
+ return new INullWriter() {
+ @Override
+ public void writeNull(DataOutput out) throws HyracksDataException {
+ }
+ };
+ }
+ }
+
/*
* TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
*/
@@ -274,6 +296,251 @@
}
@Test
+ public void customerOrderCIDInMemoryHashLeftOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
+ new int[] { 1 }, new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+ nullWriterFactories, 128);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ // "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+ // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDGraceHashLeftOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ GraceHashJoinOperatorDescriptor join = new GraceHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+ new int[] { 0 }, new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+ nullWriterFactories);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ // "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+ // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
+ public void customerOrderCIDHybridHashLeftOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ "data/tpch0.001/customer.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ "data/tpch0.001/orders.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[ordersDesc.getFields().length];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+ new int[] { 0 }, new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { UTF8StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, true,
+ nullWriterFactories);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
+
+ IOperatorDescriptor printer = DEBUG ? new PrinterOperatorDescriptor(spec)
+ : new NullSinkOperatorDescriptor(spec);
+ // FileSplit[] custOrdersJoinSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ // "data/tpch0.001/custOrdersLeftOuterJoin.csv"))) };
+ // LineFileWriteOperatorDescriptor printer = new LineFileWriteOperatorDescriptor(spec, custOrdersJoinSplits);
+ // PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
public void customerOrderCIDJoinMulti() throws Exception {
JobSpecification spec = new JobSpecification();
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
index 40cb7da..0d5dc8f 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/api/IBinaryTokenizer.java
@@ -32,6 +32,8 @@
public int getTokenLength();
+ public int getNumTokens();
+
public void writeToken(DataOutput dos) throws IOException;
public RecordDescriptor getTokenSchema();
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
index 73635f9..425436b 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizer.java
@@ -30,6 +30,7 @@
new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
private final char delimiter;
+ private final byte typeTag;
private byte[] data;
private int start;
private int length;
@@ -38,8 +39,14 @@
private int tokenStart;
private int pos;
+ public DelimitedUTF8StringBinaryTokenizer(char delimiter, byte typeTag) {
+ this.delimiter = delimiter;
+ this.typeTag = typeTag;
+ }
+
public DelimitedUTF8StringBinaryTokenizer(char delimiter) {
this.delimiter = delimiter;
+ this.typeTag = -1;
}
@Override
@@ -88,6 +95,9 @@
@Override
public void writeToken(DataOutput dos) throws IOException {
+ if (typeTag > 0)
+ dos.write(typeTag);
+
// WARNING: 2-byte length indicator is specific to UTF-8
dos.writeShort((short) tokenLength);
dos.write(data, tokenStart, tokenLength);
@@ -97,4 +107,10 @@
public RecordDescriptor getTokenSchema() {
return tokenSchema;
}
+
+ // cannot be implemented for this tokenizer
+ @Override
+ public int getNumTokens() {
+ return -1;
+ }
}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
index e3e0be3..2e85db5 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/DelimitedUTF8StringBinaryTokenizerFactory.java
@@ -22,13 +22,20 @@
private static final long serialVersionUID = 1L;
private final char delimiter;
+ private final byte typeTag;
+
+ public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter, byte typeTag) {
+ this.delimiter = delimiter;
+ this.typeTag = typeTag;
+ }
public DelimitedUTF8StringBinaryTokenizerFactory(char delimiter) {
this.delimiter = delimiter;
+ this.typeTag = -1;
}
@Override
public IBinaryTokenizer createBinaryTokenizer() {
- return new DelimitedUTF8StringBinaryTokenizer(delimiter);
+ return new DelimitedUTF8StringBinaryTokenizer(delimiter, typeTag);
}
}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
index 54fc371..2cc0b6c 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/tokenizers/HashedQGramUTF8StringBinaryTokenizer.java
@@ -142,4 +142,9 @@
public RecordDescriptor getTokenSchema() {
return tokenSchema;
}
-}
+
+ @Override
+ public int getNumTokens() {
+ return 0;
+ }
+}
\ No newline at end of file