Fixed the nested loop join algorithm to use join memory parameter.
Change-Id: I50e24ea023c9ae9aa043698716e5021d4dafc327
Reviewed-on: https://asterix-gerrit.ics.uci.edu/953
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index f07fb76..68fbba4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -242,7 +242,7 @@
OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalSort(sortFrameLimit);
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit);
- OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesHybridHash(joinFrameLimit);
+ OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit);
HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(
AqlOptimizationContextFactory.INSTANCE);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index f0f858d..7340882 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -26,8 +26,8 @@
private static final String FRAMESIZE = "FRAMESIZE";
private static final String MAX_FRAMES_EXTERNAL_SORT = "MAX_FRAMES_EXTERNAL_SORT";
private static final String MAX_FRAMES_EXTERNAL_GROUP_BY = "MAX_FRAMES_EXTERNAL_GROUP_BY";
- private static final String MAX_FRAMES_LEFT_INPUT_HYBRID_HASH = "MAX_FRAMES_LEFT_INPUT_HYBRID_HASH";
- private static final String MAX_FRAMES_HYBRID_HASH = "MAX_FRAMES_HYBRID_HASH";
+ private static final String MAX_FRAMES_FOR_JOIN_LEFT_INPUT = "MAX_FRAMES_FOR_JOIN_LEFT_INPUT";
+ private static final String MAX_FRAMES_FOR_JOIN = "MAX_FRAMES_FOR_JOIN";
private static final String FUDGE_FACTOR = "FUDGE_FACTOR";
private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME";
@@ -73,22 +73,22 @@
setInt(MAX_RECORDS_PER_FRAME, maxRecords);
}
- public int getMaxFramesLeftInputHybridHash() {
+ public int getMaxFramesForJoinLeftInput() {
int frameSize = getFrameSize();
- return getInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, (int) (140L * 1024 * MB / frameSize));
+ return getInt(MAX_FRAMES_FOR_JOIN_LEFT_INPUT, (int) (140L * 1024 * MB / frameSize));
}
- public void setMaxFramesLeftInputHybridHash(int frameLimit) {
- setInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, frameLimit);
+ public void setMaxFramesForJoinLeftInput(int frameLimit) {
+ setInt(MAX_FRAMES_FOR_JOIN_LEFT_INPUT, frameLimit);
}
- public int getMaxFramesHybridHash() {
+ public int getMaxFramesForJoin() {
int frameSize = getFrameSize();
- return getInt(MAX_FRAMES_HYBRID_HASH, (int) (64L * MB / frameSize));
+ return getInt(MAX_FRAMES_FOR_JOIN, (int) (64L * MB / frameSize));
}
- public void setMaxFramesHybridHash(int frameLimit) {
- setInt(MAX_FRAMES_HYBRID_HASH, frameLimit);
+ public void setMaxFramesForJoin(int frameLimit) {
+ setInt(MAX_FRAMES_FOR_JOIN, frameLimit);
}
public int getMaxFramesExternalGroupBy() {
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index a98e7ff..56ea55e 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -49,11 +49,13 @@
import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
public class JoinUtils {
+ private JoinUtils() {
+ }
public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context)
throws AlgebricksException {
- List<LogicalVariable> sideLeft = new LinkedList<LogicalVariable>();
- List<LogicalVariable> sideRight = new LinkedList<LogicalVariable>();
+ List<LogicalVariable> sideLeft = new LinkedList<>();
+ List<LogicalVariable> sideRight = new LinkedList<>();
List<LogicalVariable> varsLeft = op.getInputs().get(0).getValue().getSchema();
List<LogicalVariable> varsRight = op.getInputs().get(1).getValue().getSchema();
if (isHashJoinCondition(op.getCondition().getValue(), varsLeft, varsRight, sideLeft, sideRight)) {
@@ -83,25 +85,21 @@
}
private static void setNLJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
- op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, context
- .getPhysicalOptimizationConfig().getMaxRecordsPerFrame()));
+ op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
+ context.getPhysicalOptimizationConfig().getMaxFramesForJoin()));
}
private static void setHashJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context)
throws AlgebricksException {
op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(), partitioningType, sideLeft, sideRight,
- context.getPhysicalOptimizationConfig().getMaxFramesHybridHash(), context
- .getPhysicalOptimizationConfig().getMaxFramesLeftInputHybridHash(), context
- .getPhysicalOptimizationConfig().getMaxRecordsPerFrame(), context
- .getPhysicalOptimizationConfig().getFudgeFactor()));
+ context.getPhysicalOptimizationConfig().getMaxFramesForJoin(),
+ context.getPhysicalOptimizationConfig().getMaxFramesForJoinLeftInput(),
+ context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame(),
+ context.getPhysicalOptimizationConfig().getFudgeFactor()));
if (partitioningType == JoinPartitioningType.BROADCAST) {
hybridToInMemHashJoin(op, context);
}
- // op.setPhysicalOperator(new
- // InMemoryHashJoinPOperator(op.getJoinKind(), partitioningType,
- // sideLeft, sideRight,
- // 1024 * 512));
}
private static void hybridToInMemHashJoin(AbstractBinaryJoinOperator op, IOptimizationContext context)
@@ -109,17 +107,17 @@
ILogicalOperator opBuild = op.getInputs().get(1).getValue();
LogicalPropertiesVisitor.computeLogicalPropertiesDFS(opBuild, context);
ILogicalPropertiesVector v = context.getLogicalPropertiesVector(opBuild);
- AlgebricksConfig.ALGEBRICKS_LOGGER.fine("// HybridHashJoin inner branch -- Logical properties for " + opBuild
- + ": " + v + "\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER
+ .fine("// HybridHashJoin inner branch -- Logical properties for " + opBuild + ": " + v + "\n");
if (v != null) {
int size2 = v.getMaxOutputFrames();
HybridHashJoinPOperator hhj = (HybridHashJoinPOperator) op.getPhysicalOperator();
if (size2 > 0 && size2 * hhj.getFudgeFactor() <= hhj.getMemSizeInFrames()) {
- AlgebricksConfig.ALGEBRICKS_LOGGER.fine("// HybridHashJoin inner branch " + opBuild
- + " fits in memory\n");
+ AlgebricksConfig.ALGEBRICKS_LOGGER
+ .fine("// HybridHashJoin inner branch " + opBuild + " fits in memory\n");
// maintains the local properties on the probe side
- op.setPhysicalOperator(new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(), hhj
- .getKeysLeftBranch(), hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2));
+ op.setPhysicalOperator(new InMemoryHashJoinPOperator(hhj.getKind(), hhj.getPartitioningType(),
+ hhj.getKeysLeftBranch(), hhj.getKeysRightBranch(), v.getNumberOfTuples() * 2));
}
}
@@ -134,8 +132,7 @@
FunctionIdentifier fi = fexp.getFunctionIdentifier();
if (fi.equals(AlgebricksBuiltinFunctions.AND)) {
for (Mutable<ILogicalExpression> a : fexp.getArguments()) {
- if (!isHashJoinCondition(a.getValue(), inLeftAll, inRightAll, outLeftFields,
- outRightFields)) {
+ if (!isHashJoinCondition(a.getValue(), inLeftAll, inRightAll, outLeftFields, outRightFields)) {
return false;
}
}
@@ -170,9 +167,8 @@
return true;
}
}
- default: {
+ default:
return false;
- }
}
}
@@ -201,7 +197,7 @@
default:
return null;
}
- ArrayList<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+ ArrayList<LogicalVariable> vars = new ArrayList<>();
fexp.getArguments().get(i).getValue().getUsedVariables(vars);
if (varsLeft.containsAll(vars)) {
return BroadcastSide.LEFT;