Adding IPredicateEvaluator for issue 423 at hyracks level
Author: pouria.pirzadeh@gmail.com <pouria.pirzadeh@gmail.com@123451ca-8445-de46-9d55-352943316053>
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index dde4443..e0ee1f0 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -37,6 +37,7 @@
import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
public abstract class AbstractCompilerFactoryBuilder {
@@ -50,6 +51,7 @@
protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
protected IPrinterFactoryProvider printerProvider;
+ protected IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
protected IExpressionRuntimeProvider expressionRuntimeProvider;
protected IExpressionTypeComputer expressionTypeComputer;
protected INullableTypeComputer nullableTypeComputer;
@@ -111,6 +113,14 @@
public IBinaryComparatorFactoryProvider getComparatorFactoryProvider() {
return comparatorFactoryProvider;
}
+
+ public void setPredicateEvaluatorFactoryProvider(IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider) {
+ this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
+ }
+
+ public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactory() {
+ return predEvaluatorFactoryProvider;
+ }
public void setBinaryBooleanInspectorFactory(IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory) {
this.binaryBooleanInspectorFactory = binaryBooleanInspectorFactory;
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 2b24bd0..1c62307 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -91,7 +91,7 @@
binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
- partialAggregationTypeComputer, frameSize, clusterLocations);
+ partialAggregationTypeComputer, predEvaluatorFactoryProvider, frameSize, clusterLocations);
PlanCompiler pc = new PlanCompiler(context);
return pc.compilePlan(plan, null, jobEventListenerFactory);
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 6da42b4..32089e2 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -40,6 +40,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -106,6 +108,10 @@
Object t = env.getVarType(v);
comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
}
+
+ IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+ IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
+
RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
propagatedSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -125,7 +131,7 @@
case INNER: {
opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
- hashFunFactories, comparatorFactories, recDescriptor);
+ hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory);
break;
}
case LEFT_OUTER: {
@@ -135,7 +141,7 @@
}
opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
- hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
+ hashFunFactories, comparatorFactories, recDescriptor, predEvaluatorFactory, true, nullWriterFactories);
break;
}
default: {
@@ -153,7 +159,7 @@
maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
- keysRight, keysLeft));
+ keysRight, keysLeft), predEvaluatorFactory);
break;
}
case LEFT_OUTER: {
@@ -165,7 +171,7 @@
maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
- keysRight, keysLeft), true, nullWriterFactories);
+ keysRight, keysLeft), predEvaluatorFactory, true, nullWriterFactories);
break;
}
default: {
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index b09c194..a737f90 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -37,6 +37,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
@@ -86,6 +88,10 @@
Object t = env.getVarType(v);
comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
}
+
+ IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider = context.getPredicateEvaluatorFactoryProvider();
+ IPredicateEvaluatorFactory predEvaluatorFactory = predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
+
RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
propagatedSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
@@ -94,7 +100,7 @@
switch (kind) {
case INNER: {
opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
- comparatorFactories, recDescriptor, tableSize);
+ comparatorFactories, recDescriptor, tableSize, predEvaluatorFactory);
break;
}
case LEFT_OUTER: {
@@ -103,7 +109,7 @@
nullWriterFactories[j] = context.getNullWriterFactory();
}
opDesc = new InMemoryHashJoinOperatorDescriptor(spec, keysLeft, keysRight, hashFunFactories,
- comparatorFactories, recDescriptor, true, nullWriterFactories, tableSize);
+ comparatorFactories, predEvaluatorFactory, recDescriptor, true, nullWriterFactories, tableSize);
break;
}
default: {
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 365d1a5..245e5e1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -41,6 +41,8 @@
import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
public class JobGenContext {
private final IOperatorSchema outerFlowSchema;
@@ -61,6 +63,7 @@
private final IExpressionTypeComputer expressionTypeComputer;
private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+ private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
private final int frameSize;
private AlgebricksPartitionConstraint clusterLocations;
private int varCounter;
@@ -86,7 +89,7 @@
ITypingContext typingContext,
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IPartialAggregationTypeComputer partialAggregationTypeComputer,
- int frameSize, AlgebricksPartitionConstraint clusterLocations) {
+ IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize, AlgebricksPartitionConstraint clusterLocations) {
this.outerFlowSchema = outerFlowSchema;
this.metadataProvider = metadataProvider;
this.appContext = appContext;
@@ -106,6 +109,7 @@
this.typingContext = typingContext;
this.expressionEvalSizeComputer = expressionEvalSizeComputer;
this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+ this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
this.frameSize = frameSize;
this.varCounter = 0;
}
@@ -157,6 +161,10 @@
public IPrinterFactoryProvider getPrinterFactoryProvider() {
return printerFactoryProvider;
}
+
+ public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider(){
+ return predEvaluatorFactoryProvider;
+ }
public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
return expressionRuntimeProvider;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java
new file mode 100644
index 0000000..5f022ca
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluator.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+
+public interface IPredicateEvaluator {
+ public boolean evaluate(IFrameTupleAccessor fta0, int tupId0, IFrameTupleAccessor fta1, int tupId1);
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
new file mode 100644
index 0000000..6bf82da
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IPredicateEvaluatorFactory extends Serializable {
+ public IPredicateEvaluator createPredicateEvaluator();
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
new file mode 100644
index 0000000..5646a34
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface IPredicateEvaluatorFactoryProvider extends Serializable{
+ public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys0, int[] keys1);
+}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 6c58d6f..0bb514e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -22,6 +22,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -42,13 +44,14 @@
private final double factor;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IPredicateEvaluatorFactory predEvaluatorFactory;
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
int recordsPerFrame, double factor, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) {
+ RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory) {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
@@ -58,6 +61,7 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.predEvaluatorFactory = predEvalFactory;
this.isLeftOuter = false;
this.nullWriterFactories1 = null;
recordDescriptors[0] = recordDescriptor;
@@ -66,7 +70,7 @@
public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
int recordsPerFrame, double factor, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+ RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1, IPredicateEvaluatorFactory predEvalFactory) {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
@@ -76,6 +80,7 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.predEvaluatorFactory = predEvalFactory;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
@@ -143,12 +148,13 @@
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(rpartAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(spartAid, 0);
int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
-
+ final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator() );
+
return new GraceHashJoinOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
RPARTITION_ACTIVITY_ID), partition), new TaskId(new ActivityId(getOperatorId(),
SPARTITION_ACTIVITY_ID), partition), recordsPerFrame, factor, keys0, keys1, hashFunctionFactories,
comparatorFactories, nullWriterFactories1, rd1, rd0, recordDescriptors[0], numPartitions,
- isLeftOuter);
+ predEvaluator, isLeftOuter);
}
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index 91f509d..7c9bd88 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -51,12 +52,13 @@
private final double factor;
private final int numPartitions;
private final boolean isLeftOuter;
+ private final IPredicateEvaluator predEvaluator;
GraceHashJoinOperatorNodePushable(IHyracksTaskContext ctx, Object state0Id, Object state1Id, int recordsPerFrame,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
IBinaryComparatorFactory[] comparatorFactories, INullWriterFactory[] nullWriterFactories,
RecordDescriptor rd1, RecordDescriptor rd0, RecordDescriptor outRecordDescriptor, int numPartitions,
- boolean isLeftOuter) {
+ IPredicateEvaluator predEval, boolean isLeftOuter) {
this.ctx = ctx;
this.state0Id = state0Id;
this.state1Id = state1Id;
@@ -70,6 +72,7 @@
this.numPartitions = numPartitions;
this.recordsPerFrame = recordsPerFrame;
this.factor = factor;
+ this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
}
@@ -114,7 +117,7 @@
table.reset();
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1), hpcRep1,
- new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table);
+ new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter, nullWriters1, table, predEvaluator);
// build
if (buildWriter != null) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 4f9b987..ea9414b 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -29,6 +29,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
@@ -66,6 +68,7 @@
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IPredicateEvaluatorFactory predEvaluatorFactory;
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
@@ -87,7 +90,7 @@
public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
int recordsPerFrame, double factor, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor) throws HyracksDataException {
+ RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory) throws HyracksDataException {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
@@ -97,6 +100,7 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.predEvaluatorFactory = predEvalFactory;
this.isLeftOuter = false;
this.nullWriterFactories1 = null;
recordDescriptors[0] = recordDescriptor;
@@ -105,7 +109,7 @@
public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
int recordsPerFrame, double factor, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
+ RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
throws HyracksDataException {
super(spec, 2, 1);
this.memsize = memsize;
@@ -116,6 +120,7 @@
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.predEvaluatorFactory = predEvalFactory;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
recordDescriptors[0] = recordDescriptor;
@@ -189,6 +194,7 @@
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
+ final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
@@ -315,7 +321,7 @@
state.joiner = new InMemoryHashJoin(ctx, tableSize,
new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
- comparators), isLeftOuter, nullWriters1, table);
+ comparators), isLeftOuter, nullWriters1, table, predEvaluator);
bufferForPartitions = new ByteBuffer[state.nPartitions];
state.fWriters = new RunFileWriter[state.nPartitions];
for (int i = 0; i < state.nPartitions; i++) {
@@ -377,6 +383,7 @@
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
+ final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private BuildAndPartitionTaskState state;
@@ -500,7 +507,7 @@
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(
ctx.getFrameSize(), rd0), hpcRep0, new FrameTupleAccessor(ctx.getFrameSize(), rd1),
hpcRep1, new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
- nullWriters1, table);
+ nullWriters1, table, predEvaluator);
if (buildWriter != null) {
RunFileReader buildReader = buildWriter.createReader();
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
index ebf3848..2d49190 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -47,18 +48,19 @@
private final int tableSize;
private final TuplePointer storedTuplePointer;
private final boolean reverseOutputOrder; //Should we reverse the order of tuples, we are writing in output
+ private final IPredicateEvaluator predEvaluator;
public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
- ISerializableTable table) throws HyracksDataException {
- this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, false);
+ ISerializableTable table, IPredicateEvaluator predEval) throws HyracksDataException {
+ this(ctx, tableSize, accessor0, tpc0, accessor1, tpc1, comparator, isLeftOuter, nullWriters1, table, predEval, false);
}
public InMemoryHashJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
ITuplePartitionComputer tpc0, FrameTupleAccessor accessor1, ITuplePartitionComputer tpc1,
FrameTuplePairComparator comparator, boolean isLeftOuter, INullWriter[] nullWriters1,
- ISerializableTable table, boolean reverse) throws HyracksDataException {
+ ISerializableTable table, IPredicateEvaluator predEval, boolean reverse) throws HyracksDataException {
this.tableSize = tableSize;
this.table = table;
storedTuplePointer = new TuplePointer();
@@ -71,6 +73,7 @@
tpComparator = comparator;
outBuffer = ctx.allocateFrame();
appender.reset(outBuffer, true);
+ predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
int fieldCountOuter = accessor1.getFieldCount();
@@ -121,7 +124,6 @@
}
} while (true);
}
-
if (!matchFound && isLeftOuter) {
if (!appender.appendConcat(accessorProbe, i, nullTupleBuild.getFieldEndOffsets(),
nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize())) {
@@ -133,7 +135,6 @@
+ appender.getBuffer().capacity() + ")");
}
}
-
}
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index e0a5613..705923a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -29,6 +29,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -53,18 +55,20 @@
private final int[] keys1;
private final IBinaryHashFunctionFactory[] hashFunctionFactories;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final IPredicateEvaluatorFactory predEvaluatorFactory;
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
private final int tableSize;
public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, int tableSize) {
+ RecordDescriptor recordDescriptor, int tableSize, IPredicateEvaluatorFactory predEvalFactory) {
super(spec, 2, 1);
this.keys0 = keys0;
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.predEvaluatorFactory = predEvalFactory;
recordDescriptors[0] = recordDescriptor;
this.isLeftOuter = false;
this.nullWriterFactories1 = null;
@@ -73,18 +77,34 @@
public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+ IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
int tableSize) {
super(spec, 2, 1);
this.keys0 = keys0;
this.keys1 = keys1;
this.hashFunctionFactories = hashFunctionFactories;
this.comparatorFactories = comparatorFactories;
+ this.predEvaluatorFactory = predEvalFactory;
recordDescriptors[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
this.tableSize = tableSize;
}
+
+ public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, int tableSize) {
+ this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories, recordDescriptor, tableSize, null);
+ }
+
+ public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1,
+ int tableSize) {
+ this(spec, keys0, keys1, hashFunctionFactories, comparatorFactories,null,recordDescriptor,isLeftOuter,nullWriterFactories1,tableSize);
+ }
+
+
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
@@ -150,6 +170,7 @@
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
+ final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private HashBuildTaskState state;
@@ -166,7 +187,7 @@
state.joiner = new InMemoryHashJoin(ctx, tableSize,
new FrameTupleAccessor(ctx.getFrameSize(), rd0), hpc0, new FrameTupleAccessor(
ctx.getFrameSize(), rd1), hpc1, new FrameTuplePairComparator(keys0, keys1,
- comparators), isLeftOuter, nullWriters1, table);
+ comparators), isLeftOuter, nullWriters1, table, predEvaluator);
}
@Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
index 546e80f..9496b76 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -47,9 +48,10 @@
private final RunFileWriter runFileWriter;
private final boolean isLeftOuter;
private final ArrayTupleBuilder nullTupleBuilder;
-
+ private final IPredicateEvaluator predEvaluator;
+
public NestedLoopJoin(IHyracksTaskContext ctx, FrameTupleAccessor accessor0, FrameTupleAccessor accessor1,
- ITuplePairComparator comparators, int memSize, boolean isLeftOuter, INullWriter[] nullWriters1)
+ ITuplePairComparator comparators, int memSize, IPredicateEvaluator predEval, boolean isLeftOuter, INullWriter[] nullWriters1)
throws HyracksDataException {
this.accessorInner = accessor1;
this.accessorOuter = accessor0;
@@ -60,6 +62,7 @@
this.appender.reset(outBuffer, true);
this.outBuffers = new ArrayList<ByteBuffer>();
this.memSize = memSize;
+ this.predEvaluator = predEval;
this.ctx = ctx;
this.isLeftOuter = isLeftOuter;
@@ -130,8 +133,9 @@
boolean matchFound = false;
for (int j = 0; j < tupleCount1; ++j) {
int c = compare(accessorOuter, i, accessorInner, j);
- if (c == 0) {
- matchFound = true;
+ boolean prdEval = (predEvaluator == null) || (predEvaluator.evaluate(accessorOuter, i, accessorInner, j));
+ if (c == 0 && prdEval) {
+ matchFound = true;
if (!appender.appendConcat(accessorOuter, i, accessorInner, j)) {
flushFrame(outBuffer, writer);
appender.reset(outBuffer, true);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 0be01c1..d3f664e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -27,6 +27,8 @@
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -49,16 +51,24 @@
private static final long serialVersionUID = 1L;
private final ITuplePairComparatorFactory comparatorFactory;
private final int memSize;
+ private final IPredicateEvaluatorFactory predEvaluatorFactory;
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
-
+
public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+ this(spec, comparatorFactory, recordDescriptor, memSize, null, isLeftOuter, nullWriterFactories1);
+ }
+
+ public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ ITuplePairComparatorFactory comparatorFactory, RecordDescriptor recordDescriptor, int memSize,
+ IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
super(spec, 2, 1);
this.comparatorFactory = comparatorFactory;
this.recordDescriptors[0] = recordDescriptor;
this.memSize = memSize;
+ this.predEvaluatorFactory = predEvalFactory;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
}
@@ -117,7 +127,8 @@
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-
+ final IPredicateEvaluator predEvaluator = ( (predEvaluatorFactory != null) ? predEvaluatorFactory.createPredicateEvaluator() : null);
+
final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
for (int i = 0; i < nullWriterFactories1.length; i++) {
@@ -134,7 +145,7 @@
partition));
state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
- new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, isLeftOuter,
+ new FrameTupleAccessor(ctx.getFrameSize(), rd1), comparator, memSize, predEvaluator, isLeftOuter,
nullWriters1);
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index c81bf54..993ad5a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -9,6 +9,7 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -53,7 +54,8 @@
private RunFileWriter[] buildRFWriters; //writing spilled build partitions
private RunFileWriter[] probeRFWriters; //writing spilled probe partitions
-
+
+ private final IPredicateEvaluator predEvaluator;
private final boolean isLeftOuter;
private final INullWriter[] nullWriters1;
@@ -78,15 +80,15 @@
private ByteBuffer[] sPartBuffs; //Buffers for probe spilled partitions (one buffer per spilled partition)
private ByteBuffer probeResBuff; //Buffer for probe resident partition tuples
private ByteBuffer reloadBuffer; //Buffer for reloading spilled partitions during partition tuning
-
+
private int[] buildPSizeInFrames; //Used for partition tuning
private int freeFramesCounter; //Used for partition tuning
private boolean isTableEmpty; //Added for handling the case, where build side is empty (tableSize is 0)
-
+
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
- RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc) {
+ RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc, IPredicateEvaluator predEval) {
this.ctx = ctx;
this.memForJoin = memForJoin;
this.buildRd = buildRd;
@@ -106,6 +108,7 @@
this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
+ this.predEvaluator = predEval;
this.isLeftOuter = false;
this.nullWriters1 = null;
@@ -114,7 +117,7 @@
public OptimizedHybridHashJoin(IHyracksTaskContext ctx, int memForJoin, int numOfPartitions, String rel0Name,
String rel1Name, int[] keys0, int[] keys1, IBinaryComparator[] comparators, RecordDescriptor buildRd,
RecordDescriptor probeRd, ITuplePartitionComputer probeHpc, ITuplePartitionComputer buildHpc,
- boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
+ IPredicateEvaluator predEval, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
this.ctx = ctx;
this.memForJoin = memForJoin;
this.buildRd = buildRd;
@@ -133,7 +136,8 @@
this.accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), buildRd);
this.accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), probeRd);
-
+
+ this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
this.nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
@@ -423,7 +427,7 @@
this.inMemJoiner = new InMemoryHashJoin(ctx, inMemTupCount,
new FrameTupleAccessor(ctx.getFrameSize(), probeRd), probeHpc, new FrameTupleAccessor(
ctx.getFrameSize(), buildRd), buildHpc, new FrameTuplePairComparator(probeKeys, buildKeys,
- comparators), isLeftOuter, nullWriters1, table);
+ comparators), isLeftOuter, nullWriters1, table, predEvaluator);
}
private void cacheInMemJoin() throws HyracksDataException {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 7067080..01c331a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -30,6 +30,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluator;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
@@ -111,7 +113,8 @@
private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ
private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
-
+ private final IPredicateEvaluatorFactory predEvaluatorFactory;
+
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
@@ -119,7 +122,7 @@
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory0,
- ITuplePairComparatorFactory tupPaircomparatorFactory1, boolean isLeftOuter,
+ ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory, boolean isLeftOuter,
INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
super(spec, 2, 1);
@@ -133,6 +136,7 @@
this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
recordDescriptors[0] = recordDescriptor;
+ this.predEvaluatorFactory = predEvaluatorFactory;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
@@ -142,7 +146,7 @@
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
- ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1)
+ ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
throws HyracksDataException {
super(spec, 2, 1);
@@ -155,6 +159,7 @@
this.comparatorFactories = comparatorFactories;
this.tuplePairComparatorFactory0 = tupPaircomparatorFactory0;
this.tuplePairComparatorFactory1 = tupPaircomparatorFactory1;
+ this.predEvaluatorFactory = predEvaluatorFactory;
recordDescriptors[0] = recordDescriptor;
this.isLeftOuter = false;
this.nullWriterFactories1 = null;
@@ -255,6 +260,8 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
+ final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
@@ -276,12 +283,12 @@
if(!isLeftOuter){
state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
- buildHpc);
+ buildHpc, predEvaluator);
}
else{
state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
- buildHpc, isLeftOuter, nullWriterFactories1);
+ buildHpc, predEvaluator, isLeftOuter, nullWriterFactories1);
}
state.hybridHJ.initBuild();
@@ -335,7 +342,8 @@
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
-
+ final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
@@ -456,7 +464,7 @@
nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
- probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc);
+ probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator);
buildSideReader.open();
rHHj.initBuild();
@@ -518,7 +526,7 @@
nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
- buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc);
+ buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator);
probeSideReader.open();
rHHj.initBuild();
@@ -586,7 +594,7 @@
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
buildRDesc), hpcRepSmaller, new FrameTuplePairComparator(pKeys, bKeys, comparators),
- isLeftOuter, nullWriters1, table, reverse);
+ isLeftOuter, nullWriters1, table, predEvaluator, reverse);
bReader.open();
rPartbuff.clear();
@@ -614,7 +622,7 @@
throws HyracksDataException {
NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
- new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, false, null);
+ new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, false, null);
ByteBuffer cacheBuff = ctx.allocateFrame();
innerReader.open();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
index 0999ee8..b7fd241 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOptimizedHybridHashJoinTest.java
@@ -90,7 +90,7 @@
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, new JoinComparatorFactory(
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
- new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0));
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
@@ -168,7 +168,7 @@
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, new JoinComparatorFactory(
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
- new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0));
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
@@ -247,7 +247,7 @@
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
custOrderJoinDesc, new JoinComparatorFactory(
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
- new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0));
+ new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index b5eb850..9b35867 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -122,7 +122,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 128);
+ custOrderJoinDesc, 128, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -205,7 +205,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc);
+ custOrderJoinDesc, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -288,7 +288,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc);
+ custOrderJoinDesc, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -372,7 +372,7 @@
new int[] { 1 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, true, nullWriterFactories, 128);
+ null, custOrderJoinDesc, true, nullWriterFactories, 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -460,7 +460,7 @@
new int[] { 1 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, true, nullWriterFactories);
+ custOrderJoinDesc, true, nullWriterFactories, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -548,7 +548,7 @@
new int[] { 1 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, true, nullWriterFactories);
+ custOrderJoinDesc, null, true, nullWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -629,7 +629,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 128);
+ custOrderJoinDesc, 128, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -720,7 +720,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc);
+ custOrderJoinDesc, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -811,7 +811,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc);
+ custOrderJoinDesc, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -898,7 +898,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 128);
+ custOrderJoinDesc, 128, null);
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
ResultSetId rsId = new ResultSetId(1);
@@ -991,7 +991,7 @@
new int[] { 0 },
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 128);
+ custOrderJoinDesc, 128, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
diff --git a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
index 6df9ff8..8dab4de 100644
--- a/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
+++ b/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/edu/uci/ics/hyracks/examples/tpch/client/Main.java
@@ -210,7 +210,7 @@
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc);
+ custOrderJoinDesc, null);
} else if ("hybridhash".equalsIgnoreCase(algo)) {
join = new HybridHashJoinOperatorDescriptor(
@@ -224,7 +224,7 @@
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc);
+ custOrderJoinDesc, null);
} else {
join = new InMemoryHashJoinOperatorDescriptor(
@@ -234,7 +234,7 @@
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) },
new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
- custOrderJoinDesc, 6000000);
+ custOrderJoinDesc, 6000000, null);
}
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, numJoinPartitions);