Merged in Hybrid Hash Join. Added a test for HHJ.
git-svn-id: https://hyracks.googlecode.com/svn/trunk@29 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java
new file mode 100644
index 0000000..1462f52
--- /dev/null
+++ b/hyracks/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/join/HybridHashJoinOperatorDescriptor.java
@@ -0,0 +1,603 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.coreops.join;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePullable;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
+import edu.uci.ics.hyracks.api.job.JobPlan;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.context.HyracksContext;
+import edu.uci.ics.hyracks.coreops.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.coreops.RepartitionComputerFactory;
+import edu.uci.ics.hyracks.coreops.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.coreops.base.AbstractOperatorDescriptor;
+
+public class HybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final String JOINER0 = "joiner0";
+ private static final String SMALLRELATION = "RelR";
+ private static final String LARGERELATION = "RelS";
+ private static final String MEM_HASHTABLE = "MEMORY_HASHTABLE";
+ private static final String NUM_PARTITION = "NUMBER_B_PARTITIONS"; // B
+ private final int memsize;
+ private static final long serialVersionUID = 1L;
+ private final int inputsize0;
+ private final double factor;
+ private final int[] keys0;
+ private final int[] keys1;
+ private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+ private final IBinaryComparatorFactory[] comparatorFactories;
+ private final int recordsPerFrame;
+
+ private int numReadI1 = 0;
+ private int numWriteI1 = 0;
+ private int numReadI2 = 0;
+ private int numWriteI2 = 0;
+
+ /**
+ * @param spec
+ * @param memsize
+ * in frames
+ * @param inputsize0
+ * in frames
+ * @param recordsPerFrame
+ * @param factor
+ * @param keys0
+ * @param keys1
+ * @param hashFunctionFactories
+ * @param comparatorFactories
+ * @param recordDescriptor
+ * @throws HyracksDataException
+ */
+ public HybridHashJoinOperatorDescriptor(JobSpecification spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
+ throws HyracksDataException {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.factor = factor;
+ this.recordsPerFrame = recordsPerFrame;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.comparatorFactories = comparatorFactories;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ @Override
+ public void contributeTaskGraph(IActivityGraphBuilder builder) {
+ BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(SMALLRELATION);
+ PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(LARGERELATION);
+
+ builder.addTask(phase1);
+ builder.addSourceEdge(0, phase1, 0);
+
+ builder.addTask(phase2);
+ builder.addSourceEdge(1, phase2, 0);
+
+ builder.addBlockingEdge(phase1, phase2);
+
+ builder.addTargetEdge(0, phase2, 0);
+ }
+
+ private class BuildAndPartitionActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+ private String relationName;
+
+ public BuildAndPartitionActivityNode(String relationName) {
+ super();
+ this.relationName = relationName;
+
+ }
+
+ @Override
+ public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
+ int partition, int nPartitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
+ final IOperatorEnvironment env, int partition, final int nPartitions) {
+ final RecordDescriptor rd0 = plan.getJobSpecification()
+ .getOperatorInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = plan.getJobSpecification()
+ .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ IOperatorNodePushable op = new IOperatorNodePushable() {
+ private InMemoryHashJoin joiner0;
+ private final FrameTupleAccessor accessor0 = new FrameTupleAccessor(ctx, rd0);
+ ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
+ .createPartitioner();
+ private final FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx);
+ private ByteBuffer[] bufferForPartitions;
+ private final ByteBuffer inBuffer = ctx.getResourceManager().allocateFrame();
+ private File[] files;
+ private FileChannel[] channels;
+ private int memoryForHashtable;
+ private int B;
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer) {
+ throw new IllegalArgumentException();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (memoryForHashtable != 0)
+ build(inBuffer);
+
+ for (int i = 0; i < B; i++) {
+ try {
+ ByteBuffer buf = bufferForPartitions[i];
+ accessor0.reset(buf);
+ if (accessor0.getTupleCount() > 0) {
+ FileChannel wChannel = channels[i];
+ if (wChannel == null) {
+ wChannel = new RandomAccessFile(files[i], "rw").getChannel();
+ channels[i] = wChannel;
+ }
+ wChannel.write(buf);
+ numWriteI1++;
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ env.set(relationName, channels);
+ env.set(JOINER0, joiner0);
+ env.set(NUM_PARTITION, B);
+ env.set(MEM_HASHTABLE, memoryForHashtable);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
+ if (memoryForHashtable != memsize - 2) {
+ accessor0.reset(buffer);
+ int tCount = accessor0.getTupleCount();
+ for (int i = 0; i < tCount; ++i) {
+ int entry = -1;
+ if (memoryForHashtable == 0) {
+ entry = hpc0.partition(accessor0, i, B);
+ boolean newBuffer = false;
+ ByteBuffer bufBi = bufferForPartitions[entry];
+ while (true) {
+ appender.reset(bufBi, newBuffer);
+ if (appender.append(accessor0, i)) {
+ break;
+ } else {
+ try {
+ FileChannel wChannel = channels[entry];
+ if (wChannel == null) {
+ wChannel = new RandomAccessFile(files[entry], "rw").getChannel();
+ channels[entry] = wChannel;
+ }
+ wChannel.write(bufBi);
+ numWriteI1++;
+ bufBi.clear();
+ newBuffer = true;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ } else {
+ entry = hpc0.partition(accessor0, i, (int) (inputsize0 * factor / nPartitions));
+ if (entry < memoryForHashtable) {
+ while (true) {
+ if (!ftappender.append(accessor0, i)) {
+ build(inBuffer);
+
+ ftappender.reset(inBuffer, true);
+ } else
+ break;
+ }
+ } else {
+ entry %= B;
+ boolean newBuffer = false;
+ ByteBuffer bufBi = bufferForPartitions[entry];
+ while (true) {
+ appender.reset(bufBi, newBuffer);
+ if (appender.append(accessor0, i)) {
+ break;
+ } else {
+ try {
+ FileChannel wChannel;
+ if (channels[entry] == null) {
+ wChannel = new RandomAccessFile(files[entry], "rw").getChannel();
+ channels[entry] = wChannel;
+ } else {
+ wChannel = channels[entry];
+ }
+ wChannel.write(bufBi);
+ numWriteI1++;
+ bufBi.clear();
+ newBuffer = true;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ }
+ }
+
+ }
+ } else {
+ build(buffer);
+ }
+
+ }
+
+ private void build(ByteBuffer inBuffer) throws HyracksDataException {
+ ByteBuffer copyBuffer = ctx.getResourceManager().allocateFrame();
+ FrameUtils.copy(inBuffer, copyBuffer);
+ joiner0.build(copyBuffer);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (memsize > 1) {
+ if (memsize > inputsize0) {
+ B = 0;
+ } else {
+ B = (int) (Math.ceil((double) (inputsize0 * factor / nPartitions - memsize)
+ / (double) (memsize - 1)));
+ }
+ if (B <= 0) {
+ // becomes in-memory HJ
+ memoryForHashtable = memsize - 2;
+ B = 0;
+ } else {
+ memoryForHashtable = memsize - B - 2;
+ if (memoryForHashtable < 0) {
+ memoryForHashtable = 0;
+ B = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
+ }
+ }
+ } else {
+ throw new HyracksDataException("not enough memory");
+ }
+
+ ITuplePartitionComputer hpc0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories)
+ .createPartitioner();
+ ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
+ .createPartitioner();
+ int tableSize = (int) (memoryForHashtable * recordsPerFrame * factor);
+ joiner0 = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(ctx, rd0), hpc0,
+ new FrameTupleAccessor(ctx, rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
+ comparators));
+ files = new File[B];
+ channels = new FileChannel[B];
+ bufferForPartitions = new ByteBuffer[B];
+ for (int i = 0; i < B; i++) {
+ try {
+ files[i] = ctx.getResourceManager().createFile(relationName, null);
+ files[i].deleteOnExit();
+ bufferForPartitions[i] = ctx.getResourceManager().allocateFrame();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ ftappender.reset(inBuffer, true);
+ }
+
+ };
+ return op;
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return HybridHashJoinOperatorDescriptor.this;
+ }
+
+ @Override
+ public boolean supportsPullInterface() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsPushInterface() {
+ return true;
+ }
+
+ }
+
+ private class PartitionAndJoinActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+ private String largeRelation;
+
+ public PartitionAndJoinActivityNode(String relationName) {
+ super();
+ this.largeRelation = relationName;
+ }
+
+ @Override
+ public IOperatorNodePullable createPullRuntime(HyracksContext ctx, JobPlan plan, IOperatorEnvironment env,
+ int partition, int nPartitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final HyracksContext ctx, JobPlan plan,
+ final IOperatorEnvironment env, int partition, final int nPartitions) {
+ final RecordDescriptor rd0 = plan.getJobSpecification()
+ .getOperatorInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor rd1 = plan.getJobSpecification()
+ .getOperatorInputRecordDescriptor(getOperatorId(), 1);
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+
+ IOperatorNodePushable op = new IOperatorNodePushable() {
+ private InMemoryHashJoin joiner0;
+ private final FrameTupleAccessor accessor1 = new FrameTupleAccessor(ctx, rd1);
+ private ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+ hashFunctionFactories);
+ private ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+ hashFunctionFactories);
+ ITuplePartitionComputer hpc1 = hpcf1.createPartitioner();
+
+ private final FrameTupleAppender appender = new FrameTupleAppender(ctx);
+ private final FrameTupleAppender ftap = new FrameTupleAppender(ctx);
+ private final ByteBuffer inBuffer = ctx.getResourceManager().allocateFrame();
+ private final ByteBuffer outBuffer = ctx.getResourceManager().allocateFrame();
+ private IFrameWriter writer;
+ private FileChannel[] channelsR;
+ private FileChannel[] channelsS;
+ private File filesS[];
+ private ByteBuffer[] bufferForPartitions;
+ private int B;
+ private int memoryForHashtable;
+
+ @Override
+ public void open() throws HyracksDataException {
+ joiner0 = (InMemoryHashJoin) env.get(JOINER0);
+ writer.open();
+ channelsR = (FileChannel[]) env.get(SMALLRELATION);
+ B = (Integer) env.get(NUM_PARTITION);
+ memoryForHashtable = (Integer) env.get(MEM_HASHTABLE);
+ filesS = new File[B];
+ channelsS = new FileChannel[B];
+ bufferForPartitions = new ByteBuffer[B];
+ for (int i = 0; i < B; i++) {
+ try {
+ filesS[i] = ctx.getResourceManager().createFile(largeRelation, null);
+ filesS[i].deleteOnExit();
+ bufferForPartitions[i] = ctx.getResourceManager().allocateFrame();
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ appender.reset(outBuffer, true);
+ ftap.reset(inBuffer, true);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (memoryForHashtable != memsize - 2) {
+ accessor1.reset(buffer);
+ int tupleCount1 = accessor1.getTupleCount();
+ for (int i = 0; i < tupleCount1; ++i) {
+
+ int entry = -1;
+ if (memoryForHashtable == 0) {
+ entry = hpc1.partition(accessor1, i, B);
+ boolean newBuffer = false;
+ ByteBuffer outbuf = bufferForPartitions[entry];
+ while (true) {
+ appender.reset(outbuf, newBuffer);
+ if (appender.append(accessor1, i)) {
+ break;
+ } else {
+ try {
+ FileChannel wChannel = channelsS[entry];
+ if (wChannel == null) {
+ wChannel = new RandomAccessFile(filesS[entry], "rw").getChannel();
+ channelsS[entry] = wChannel;
+ }
+
+ wChannel.write(outbuf);
+ numWriteI2++;
+ outbuf.clear();
+ newBuffer = true;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ } else {
+ entry = hpc1.partition(accessor1, i, (int) (inputsize0 * factor / nPartitions));
+ if (entry < memoryForHashtable) {
+ while (true) {
+ if (!ftap.append(accessor1, i)) {
+ joiner0.join(inBuffer, writer);
+ ftap.reset(inBuffer, true);
+ } else
+ break;
+ }
+
+ } else {
+ entry %= B;
+ boolean newBuffer = false;
+ ByteBuffer outbuf = bufferForPartitions[entry];
+ while (true) {
+ appender.reset(outbuf, newBuffer);
+ if (appender.append(accessor1, i)) {
+ break;
+ } else {
+ try {
+ FileChannel wChannel = channelsS[entry];
+ if (wChannel == null) {
+ wChannel = new RandomAccessFile(filesS[entry], "rw").getChannel();
+ channelsS[entry] = wChannel;
+ wChannel = channelsS[entry];
+ }
+ wChannel.write(outbuf);
+ numWriteI2++;
+ outbuf.clear();
+ newBuffer = true;
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ }
+ }
+ }
+ } else {
+ joiner0.join(buffer, writer);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ joiner0.join(inBuffer, writer);
+ joiner0.closeJoin(writer);
+ ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(B, hpcf0).createPartitioner();
+ ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(B, hpcf1).createPartitioner();
+ if (memoryForHashtable != memsize - 2) {
+ int[] memRi = new int[B];
+ for (int i = 0; i < B; i++) {
+ try {
+ FileChannel wChannel = channelsS[i];
+ if (wChannel != null) {
+ ByteBuffer outbuf = bufferForPartitions[i];
+ accessor1.reset(outbuf);
+ if (accessor1.getTupleCount() > 0) {
+ wChannel.write(outbuf);
+ numWriteI2++;
+ }
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ inBuffer.clear();
+ int tableSize = -1;
+ if (memoryForHashtable == 0) {
+ tableSize = (int) (B * recordsPerFrame * factor);
+ } else {
+ tableSize = (int) (memsize * recordsPerFrame * factor);
+ }
+ for (int partitionid = 0; partitionid < B; partitionid++) {
+
+ int state = 0;
+ try {
+ FileChannel inChannel = channelsR[partitionid];
+ if (inChannel != null) {
+ inChannel.position(0);
+ InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
+ new FrameTupleAccessor(ctx, rd0), hpcRep0,
+ new FrameTupleAccessor(ctx, rd1), hpcRep1, new FrameTuplePairComparator(
+ keys0, keys1, comparators));
+ state = inChannel.read(inBuffer);
+ while (state != -1) {
+ numReadI1++;
+ ByteBuffer copyBuffer = ctx.getResourceManager().allocateFrame();
+ FrameUtils.copy(inBuffer, copyBuffer);
+ joiner.build(copyBuffer);
+ inBuffer.clear();
+ memRi[partitionid]++;
+ state = inChannel.read(inBuffer);
+ }
+ appender.reset(outBuffer, false);
+
+ inBuffer.clear();
+
+ FileChannel inChannelS = channelsS[partitionid];
+ if (inChannelS != null) {
+ inChannelS.position(0);
+ while (inChannelS.read(inBuffer) != -1) {
+ numReadI2++;
+ joiner.join(inBuffer, writer);
+ inBuffer.clear();
+ }
+ inChannelS.close();
+ joiner.closeJoin(writer);
+ }
+ inChannel.close();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ writer.close();
+ env.set(LARGERELATION, null);
+ env.set(SMALLRELATION, null);
+ env.set(JOINER0, null);
+ env.set(MEM_HASHTABLE, null);
+ env.set(NUM_PARTITION, null);
+
+ }
+
+ @Override
+ public void setFrameWriter(int index, IFrameWriter writer) {
+ if (index != 0) {
+ throw new IllegalStateException();
+ }
+ this.writer = writer;
+ }
+ };
+ return op;
+ }
+
+ @Override
+ public IOperatorDescriptor getOwner() {
+ return HybridHashJoinOperatorDescriptor.this;
+ }
+
+ @Override
+ public boolean supportsPullInterface() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsPushInterface() {
+ return true;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 64a59c4..bb8ac82 100644
--- a/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -41,6 +41,7 @@
import edu.uci.ics.hyracks.coreops.file.CSVFileScanOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.file.FileSplit;
import edu.uci.ics.hyracks.coreops.join.GraceHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.coreops.join.HybridHashJoinOperatorDescriptor;
import edu.uci.ics.hyracks.coreops.join.InMemoryHashJoinOperatorDescriptor;
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
@@ -184,6 +185,74 @@
}
@Test
+ public void customerOrderCIDHybridHashJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
+
+ CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+ CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ custScanner.setPartitionConstraint(custPartitionConstraint);
+
+ HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 200, 1.2,
+ new int[] { 1 }, new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ join.setPartitionConstraint(joinPartitionConstraint);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
public void customerOrderCIDJoinMulti() throws Exception {
JobSpecification spec = new JobSpecification();
@@ -335,6 +404,82 @@
}
@Test
+ public void customerOrderCIDHybridHashJoinMulti() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE });
+
+ CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+ CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ custScanner.setPartitionConstraint(custPartitionConstraint);
+
+ HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 3, 20, 100, 1.2,
+ new int[] { 1 }, new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
+ new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ join.setPartitionConstraint(joinPartitionConstraint);
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(
+ new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ printer.setPartitionConstraint(printerPartitionConstraint);
+
+ IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
public void customerOrderCIDJoinAutoExpand() throws Exception {
JobSpecification spec = new JobSpecification();