Merge branch 'master' of https://code.google.com/p/hyracks
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index e0ee1f0..d66ddb3 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -62,7 +62,6 @@
protected IMergeAggregationExpressionFactory mergeAggregationExpressionFactory;
protected PhysicalOptimizationConfig physicalOptimizationConfig = new PhysicalOptimizationConfig();
protected AlgebricksPartitionConstraint clusterLocations;
- protected int frameSize = -1;
public abstract ICompilerFactory create();
@@ -195,14 +194,6 @@
return normalizedKeyComputerFactoryProvider;
}
- public void setFrameSize(int frameSize) {
- this.frameSize = frameSize;
- }
-
- public int getFrameSize() {
- return frameSize;
- }
-
public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
return partialAggregationTypeComputer;
}
diff --git a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 1c62307..5da4e67 100644
--- a/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -42,12 +42,12 @@
}
@Override
- public IOptimizationContext createOptimizationContext(int varCounter, int frameSize,
+ public IOptimizationContext createOptimizationContext(int varCounter,
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
PhysicalOptimizationConfig physicalOptimizationConfig) {
- return new AlgebricksOptimizationContext(varCounter, frameSize, expressionEvalSizeComputer,
+ return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer, nullableTypeComputer,
physicalOptimizationConfig);
}
@@ -69,7 +69,7 @@
@Override
public ICompiler createCompiler(final ILogicalPlan plan, final IMetadataProvider<?, ?> metadata,
int varCounter) {
- final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter, frameSize,
+ final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter,
expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
nullableTypeComputer, physicalOptimizationConfig);
oc.setMetadataDeclarations(metadata);
@@ -91,7 +91,9 @@
binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
- partialAggregationTypeComputer, predEvaluatorFactoryProvider, frameSize, clusterLocations);
+ partialAggregationTypeComputer, predEvaluatorFactoryProvider,
+ physicalOptimizationConfig.getFrameSize(), clusterLocations);
+
PlanCompiler pc = new PlanCompiler(context);
return pc.compilePlan(plan, null, jobEventListenerFactory);
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 0aa1ff6..4f9ddf0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -32,9 +32,6 @@
public abstract int getVarCounter();
- // -1 if unknown
- public abstract int getFrameSize();
-
public abstract void setVarCounter(int varCounter);
public abstract LogicalVariable newVar();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index e4fd78e..1979048 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -294,7 +294,7 @@
}
overhead += sz;
}
- int frameSize = context.getFrameSize();
+ int frameSize = context.getPhysicalOptimizationConfig().getFrameSize();
if (frameSize > 0) {
long sz = frames0 * frameSize + overhead * v.getNumberOfTuples();
int frames1 = (int) (sz / frameSize);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 738fc7f..79b463f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -70,17 +70,14 @@
protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>>();
protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
- private final int frameSize;
private final IExpressionTypeComputer expressionTypeComputer;
private final INullableTypeComputer nullableTypeComputer;
- public AlgebricksOptimizationContext(int varCounter, int frameSize,
- IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
PhysicalOptimizationConfig physicalOptimizationConfig) {
this.varCounter = varCounter;
- this.frameSize = frameSize;
this.expressionEvalSizeComputer = expressionEvalSizeComputer;
this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
this.expressionTypeComputer = expressionTypeComputer;
@@ -220,11 +217,6 @@
return varEvalSizeEnv;
}
- @Override
- public int getFrameSize() {
- return frameSize;
- }
-
public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory() {
return mergeAggregationExpressionFactory;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
index 1373fa7..9f141e3 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -21,7 +21,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
public interface IOptimizationContextFactory {
- public IOptimizationContext createOptimizationContext(int varCounter, int frameSize,
+ public IOptimizationContext createOptimizationContext(int varCounter,
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index fc6c198..c3858ba 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -4,10 +4,15 @@
public class PhysicalOptimizationConfig {
private static final int MB = 1048576;
+
private static final String FRAMESIZE = "FRAMESIZE";
private static final String MAX_FRAMES_EXTERNAL_SORT = "MAX_FRAMES_EXTERNAL_SORT";
private static final String MAX_FRAMES_EXTERNAL_GROUP_BY = "MAX_FRAMES_EXTERNAL_GROUP_BY";
-
+ private static final String MAX_FRAMES_LEFT_INPUT_HYBRID_HASH = "MAX_FRAMES_LEFT_INPUT_HYBRID_HASH";
+ private static final String MAX_FRAMES_HYBRID_HASH = "MAX_FRAMES_HYBRID_HASH";
+ private static final String FUDGE_FACTOR = "FUDGE_FACTOR";
+ private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME";
+
private static final String DEFAULT_HASH_GROUP_TABLE_SIZE = "DEFAULT_HASH_GROUP_TABLE_SIZE";
private static final String DEFAULT_EXTERNAL_GROUP_TABLE_SIZE = "DEFAULT_EXTERNAL_GROUP_TABLE_SIZE";
private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE";
@@ -33,14 +38,39 @@
public void setFrameSize(int frameSize) {
setInt(FRAMESIZE, frameSize);
}
-
- public int getMaxFramesExternalSort() {
- int frameSize = getFrameSize();
- return getInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 512 * MB) / frameSize));
+
+ public double getFudgeFactor() {
+ return getDouble(FUDGE_FACTOR, 1.3);
}
- public void setMaxFramesExternalSort(int frameLimit) {
- setInt(MAX_FRAMES_EXTERNAL_SORT, frameLimit);
+ public void setFudgeFactor(double fudgeFactor) {
+ setDouble(FUDGE_FACTOR, fudgeFactor);
+ }
+
+ public int getMaxRecordsPerFrame() {
+ return getInt(MAX_RECORDS_PER_FRAME, 512);
+ }
+
+ public void setMaxRecordsPerFrame(int maxRecords) {
+ setInt(MAX_RECORDS_PER_FRAME, maxRecords);
+ }
+
+ public int getMaxFramesLeftInputHybridHash() {
+ int frameSize = getFrameSize();
+ return getInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, (int) (140L * 1024 * MB / frameSize));
+ }
+
+ public void setMaxFramesLeftInputHybridHash(int frameLimit) {
+ setInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, frameLimit);
+ }
+
+ public int getMaxFramesHybridHash() {
+ int frameSize = getFrameSize();
+ return getInt(MAX_FRAMES_HYBRID_HASH, (int) (64L * MB / frameSize));
+ }
+
+ public void setMaxFramesHybridHash(int frameLimit) {
+ setInt(MAX_FRAMES_HYBRID_HASH, frameLimit);
}
public int getMaxFramesExternalGroupBy() {
@@ -51,6 +81,15 @@
public void setMaxFramesExternalGroupBy(int frameLimit) {
setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, frameLimit);
}
+
+ public int getMaxFramesExternalSort() {
+ int frameSize = getFrameSize();
+ return getInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 32 * MB) / frameSize));
+ }
+
+ public void setMaxFramesExternalSort(int frameLimit) {
+ setInt(MAX_FRAMES_EXTERNAL_SORT, frameLimit);
+ }
public int getHashGroupByTableSize() {
return getInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
@@ -87,5 +126,17 @@
else
return Integer.parseInt(value);
}
+
+ private void setDouble(String property, double value) {
+ properties.setProperty(property, Double.toString(value));
+ }
+
+ private double getDouble(String property, double defaultValue) {
+ String value = properties.getProperty(property);
+ if (value == null)
+ return defaultValue;
+ else
+ return Double.parseDouble(value);
+ }
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/JoinUtils.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/JoinUtils.java
index ddc00e3..b2eb07b 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -46,14 +46,6 @@
public class JoinUtils {
- private final static int MB = 1048576;
-
- private final static double DEFAULT_FUDGE_FACTOR = 1.3;
- private final static int MAX_RECORDS_PER_FRAME = 512;
- private final static int DEFAULT_FRAME_SIZE = 32768;
- private final static int MAX_LEFT_INPUT_SIZE_HYBRID_HASH = (int) (140L * 1024 * MB / DEFAULT_FRAME_SIZE);
- private final static int DEFAULT_MEMORY_SIZE_HYBRID_HASH = (int) (256L * MB / DEFAULT_FRAME_SIZE);
-
public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context)
throws AlgebricksException {
List<LogicalVariable> sideLeft = new LinkedList<LogicalVariable>();
@@ -82,21 +74,23 @@
}
}
} else {
- setNLJoinOp(op);
+ setNLJoinOp(op, context);
}
}
- private static void setNLJoinOp(AbstractBinaryJoinOperator op) {
- op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
- DEFAULT_MEMORY_SIZE_HYBRID_HASH));
+ private static void setNLJoinOp(AbstractBinaryJoinOperator op, IOptimizationContext context) {
+ op.setPhysicalOperator(new NLJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, context
+ .getPhysicalOptimizationConfig().getMaxRecordsPerFrame()));
}
private static void setHashJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context)
throws AlgebricksException {
op.setPhysicalOperator(new HybridHashJoinPOperator(op.getJoinKind(), partitioningType, sideLeft, sideRight,
- DEFAULT_MEMORY_SIZE_HYBRID_HASH, MAX_LEFT_INPUT_SIZE_HYBRID_HASH, MAX_RECORDS_PER_FRAME,
- DEFAULT_FUDGE_FACTOR));
+ context.getPhysicalOptimizationConfig().getMaxFramesHybridHash(), context
+ .getPhysicalOptimizationConfig().getMaxFramesLeftInputHybridHash(), context
+ .getPhysicalOptimizationConfig().getMaxRecordsPerFrame(), context
+ .getPhysicalOptimizationConfig().getFudgeFactor()));
if (partitioningType == JoinPartitioningType.BROADCAST) {
hybridToInMemHashJoin(op, context);
}