cross merge fullstack_release_candidate into trunk
git-svn-id: https://hyracks.googlecode.com/svn/trunk@3208 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/algebricks/algebricks-common/pom.xml b/fullstack/algebricks/algebricks-common/pom.xml
index a5677a5..521ef12 100644
--- a/fullstack/algebricks/algebricks-common/pom.xml
+++ b/fullstack/algebricks/algebricks-common/pom.xml
@@ -16,8 +16,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/fullstack/algebricks/algebricks-compiler/pom.xml b/fullstack/algebricks/algebricks-compiler/pom.xml
index 8dc083d..bd35835 100644
--- a/fullstack/algebricks/algebricks-compiler/pom.xml
+++ b/fullstack/algebricks/algebricks-compiler/pom.xml
@@ -16,8 +16,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
index f1e7acb..dde4443 100644
--- a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
+++ b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -44,6 +45,7 @@
protected ITypeTraitProvider typeTraitProvider;
protected ISerializerDeserializerProvider serializerDeserializerProvider;
protected IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+ protected IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
protected IBinaryComparatorFactoryProvider comparatorFactoryProvider;
protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory;
protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
@@ -94,6 +96,14 @@
return hashFunctionFactoryProvider;
}
+ public void setHashFunctionFamilyProvider(IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider) {
+ this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+ }
+
+ public IBinaryHashFunctionFamilyProvider getHashFunctionFamilyProvider() {
+ return hashFunctionFamilyProvider;
+ }
+
public void setComparatorFactoryProvider(IBinaryComparatorFactoryProvider comparatorFactoryProvider) {
this.comparatorFactoryProvider = comparatorFactoryProvider;
}
diff --git a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 1d21463..edc1b66 100644
--- a/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ b/fullstack/algebricks/algebricks-compiler/src/main/java/edu/uci/ics/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -84,11 +84,12 @@
public JobSpecification createJob(Object appContext) throws AlgebricksException {
AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting Job Generation.\n");
JobGenContext context = new JobGenContext(null, metadata, appContext,
- serializerDeserializerProvider, hashFunctionFactoryProvider, comparatorFactoryProvider,
- typeTraitProvider, binaryBooleanInspectorFactory, binaryIntegerInspectorFactory,
- printerProvider, nullWriterFactory, normalizedKeyComputerFactoryProvider,
- expressionRuntimeProvider, expressionTypeComputer, nullableTypeComputer, oc,
- expressionEvalSizeComputer, partialAggregationTypeComputer, frameSize, clusterLocations);
+ serializerDeserializerProvider, hashFunctionFactoryProvider,
+ hashFunctionFamilyProvider, comparatorFactoryProvider, typeTraitProvider,
+ binaryBooleanInspectorFactory, binaryIntegerInspectorFactory, printerProvider,
+ nullWriterFactory, normalizedKeyComputerFactoryProvider, expressionRuntimeProvider,
+ expressionTypeComputer, nullableTypeComputer, oc, expressionEvalSizeComputer,
+ partialAggregationTypeComputer, frameSize, clusterLocations);
PlanCompiler pc = new PlanCompiler(context);
return pc.compilePlan(plan, null);
}
diff --git a/fullstack/algebricks/algebricks-core/pom.xml b/fullstack/algebricks/algebricks-core/pom.xml
index a74a540..def5b35 100644
--- a/fullstack/algebricks/algebricks-core/pom.xml
+++ b/fullstack/algebricks/algebricks-core/pom.xml
@@ -16,8 +16,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index 165fccd..6701385 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -89,4 +89,9 @@
public IPhysicalPropertiesVector getDeliveredPhysicalProperties();
public void computeDeliveredPhysicalProperties(IOptimizationContext context) throws AlgebricksException;
+
+ /**
+ * Indicates whether the expressions used by this operator must be variable reference expressions.
+ */
+ public boolean requiresVariableReferenceExpressions();
}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 468d25c..0aa1ff6 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -51,6 +51,8 @@
* returns true if op1 and op2 have already been compared
*/
public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
+
+ public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
public abstract void addNotToBeInlinedVar(LogicalVariable var);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 5234d2c..b8bdf3e 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -20,6 +20,7 @@
CLUSTER,
DATASOURCESCAN,
DISTINCT,
+ DISTRIBUTE_RESULT,
GROUP,
EMPTYTUPLESOURCE,
EXCHANGE,
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index a969372..32cfb9a 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -7,6 +7,7 @@
BTREE_SEARCH,
STATS,
DATASOURCE_SCAN,
+ DISTRIBUTE_RESULT,
EMPTY_TUPLE_SOURCE,
EXTERNAL_GROUP_BY,
IN_MEMORY_HASH_JOIN,
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
index bf3f82b..b284b22 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ScalarFunctionCallExpression.java
@@ -39,9 +39,11 @@
@Override
public ScalarFunctionCallExpression cloneExpression() {
- cloneAnnotations();
List<Mutable<ILogicalExpression>> clonedArgs = cloneArguments();
- return new ScalarFunctionCallExpression(finfo, clonedArgs);
+ ScalarFunctionCallExpression funcExpr = new ScalarFunctionCallExpression(finfo, clonedArgs);
+ funcExpr.getAnnotations().putAll(cloneAnnotations());
+ funcExpr.setOpaqueParameters(this.getOpaqueParameters());
+ return funcExpr;
}
@Override
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
index 71932d8..652f9b0 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/UnnestingFunctionCallExpression.java
@@ -45,6 +45,7 @@
List<Mutable<ILogicalExpression>> clonedArgs = cloneArguments();
UnnestingFunctionCallExpression ufce = new UnnestingFunctionCallExpression(finfo, clonedArgs);
ufce.setReturnsUniqueValues(returnsUniqueValues);
+ ufce.setOpaqueParameters(this.getOpaqueParameters());
return ufce;
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 899b633..82187e3 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -51,6 +51,10 @@
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
throws AlgebricksException;
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+ JobSpecification spec) throws AlgebricksException;
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index dc0edfe..64dbdef 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -182,4 +182,9 @@
return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getNullableTypeComputer(),
ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
}
+
+ @Override
+ public boolean requiresVariableReferenceExpressions() {
+ return true;
+ }
}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
index a4dc2e0..4543997 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AssignOperator.java
@@ -89,4 +89,8 @@
return env;
}
+ @Override
+ public boolean requiresVariableReferenceExpressions() {
+ return false;
+ }
}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
index 3227f3d..3c21c8f 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DataSourceScanOperator.java
@@ -106,5 +106,4 @@
}
return env;
}
-
}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
index 20aa574..03bfcba 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
@@ -81,4 +81,8 @@
return createPropagatingAllInputsTypeEnvironment(ctx);
}
+ @Override
+ public boolean requiresVariableReferenceExpressions() {
+ return false;
+ }
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
index ee0dcf6..b1da831 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
@@ -49,12 +50,34 @@
@Override
public void recomputeSchema() {
- schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+ if (schema == null) {
+ schema = new ArrayList<LogicalVariable>();
+ }
+ schema.clear();
+ for (Mutable<ILogicalExpression> eRef : expressions) {
+ ILogicalExpression e = eRef.getValue();
+ if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression v = (VariableReferenceExpression) e;
+ schema.add(v.getVariableReference());
+ }
+ }
}
@Override
public VariablePropagationPolicy getVariablePropagationPolicy() {
- return VariablePropagationPolicy.ALL;
+ return new VariablePropagationPolicy() {
+ @Override
+ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> eRef : expressions) {
+ ILogicalExpression e = eRef.getValue();
+ if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression v = (VariableReferenceExpression) e;
+ target.addVariable(v.getVariableReference());
+ }
+ }
+ }
+ };
}
@Override
@@ -105,7 +128,16 @@
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
- return createPropagatingAllInputsTypeEnvironment(ctx);
+ IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getMetadataProvider());
+ IVariableTypeEnvironment childEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue());
+ for (Mutable<ILogicalExpression> exprRef : expressions) {
+ ILogicalExpression expr = exprRef.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;
+ env.setVarType(varRefExpr.getVariableReference(), childEnv.getType(expr));
+ }
+ }
+ return env;
}
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
new file mode 100644
index 0000000..6ca6d87
--- /dev/null
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistributeResultOperator.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class DistributeResultOperator extends AbstractLogicalOperator {
+ private List<Mutable<ILogicalExpression>> expressions;
+ private IDataSink dataSink;
+
+ public DistributeResultOperator(List<Mutable<ILogicalExpression>> expressions, IDataSink dataSink) {
+ this.expressions = expressions;
+ this.dataSink = dataSink;
+ }
+
+ public List<Mutable<ILogicalExpression>> getExpressions() {
+ return expressions;
+ }
+
+ public IDataSink getDataSink() {
+ return dataSink;
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.DISTRIBUTE_RESULT;
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ return visitor.visitDistributeResultOperator(this, arg);
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+ boolean modif = false;
+ for (int i = 0; i < expressions.size(); i++) {
+ boolean b = visitor.transform(expressions.get(i));
+ if (b) {
+ modif = true;
+ }
+ }
+ return modif;
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return VariablePropagationPolicy.ALL;
+ }
+
+ @Override
+ public boolean isMap() {
+ return false; // actually depends on the physical op.
+ }
+
+ @Override
+ public void recomputeSchema() {
+ schema = new ArrayList<LogicalVariable>();
+ schema.addAll(inputs.get(0).getValue().getSchema());
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ return createPropagatingAllInputsTypeEnvironment(ctx);
+ }
+
+}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
index 101b6f5..5aa858f 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
@@ -51,7 +51,7 @@
@Override
public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
- return false;
+ return delegate.acceptExpressionTransform(transform);
}
@Override
@@ -108,4 +108,13 @@
return this.createPropagatingAllInputsTypeEnvironment(ctx);
}
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+
+ public IOperatorExtension getDelegate() {
+ return delegate;
+ }
+
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
index 98c3301..0a80337 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+import java.util.Collection;
import java.util.List;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -42,5 +43,8 @@
void setPhysicalOperator(IPhysicalOperator physicalOperator);
ExecutionMode getExecutionMode();
-
+
+ public void getUsedVariables(Collection<LogicalVariable> usedVars);
+
+ public void getProducedVariables(Collection<LogicalVariable> producedVars);
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java
index 3c6a699..8e67ed9 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/LimitOperator.java
@@ -109,5 +109,4 @@
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
return createPropagatingAllInputsTypeEnvironment(ctx);
}
-
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
index 8c611b0..fda920a 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SelectOperator.java
@@ -102,4 +102,8 @@
return env;
}
+ @Override
+ public boolean requiresVariableReferenceExpressions() {
+ return false;
+ }
}
\ No newline at end of file
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 0539cbe..1b4be1e 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -47,8 +47,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -64,7 +66,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -476,6 +477,14 @@
}
@Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext ctx)
+ throws AlgebricksException {
+ // propagateFDsAndEquivClasses(op, ctx);
+ setEmptyFDsEqClasses(op, ctx);
+ return null;
+ }
+
+ @Override
public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext ctx) throws AlgebricksException {
// propagateFDsAndEquivClasses(op, ctx);
setEmptyFDsEqClasses(op, ctx);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 31061db..ac6d887 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
@@ -345,7 +346,7 @@
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.UNNEST_MAP)
return Boolean.FALSE;
- UnnestOperator unnestOpArg = (UnnestOperator) copyAndSubstituteVar(op, arg);
+ UnnestMapOperator unnestOpArg = (UnnestMapOperator) copyAndSubstituteVar(op, arg);
boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getVariables(), unnestOpArg.getVariables());
if (!isomorphic)
return Boolean.FALSE;
@@ -425,6 +426,17 @@
}
@Override
+ public Boolean visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg)
+ throws AlgebricksException {
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+ if (aop.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT)
+ return Boolean.FALSE;
+ DistributeResultOperator writeOpArg = (DistributeResultOperator) copyAndSubstituteVar(op, arg);
+ boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), writeOpArg.getSchema());
+ return isomorphic;
+ }
+
+ @Override
public Boolean visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT)
@@ -762,6 +774,14 @@
}
@Override
+ public ILogicalOperator visitDistributeResultOperator(DistributeResultOperator op, Void arg)
+ throws AlgebricksException {
+ ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ deepCopyExpressionRefs(newExpressions, op.getExpressions());
+ return new DistributeResultOperator(newExpressions, op.getDataSink());
+ }
+
+ @Override
public ILogicalOperator visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
ArrayList<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
deepCopyExpressionRefs(newKeyExpressions, op.getKeyExpressions());
@@ -784,8 +804,8 @@
deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
- Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(((AbstractLogicalExpression)op.getFilterExpression())
- .cloneExpression());
+ Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
+ ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
return new IndexInsertDeleteOperator(op.getDataSourceIndex(), newPrimaryKeyExpressions,
newSecondaryKeyExpressions, newFilterExpression, op.getOperation());
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 562bb4c..b9544c7 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -37,8 +37,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -54,7 +56,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -230,6 +231,13 @@
}
@Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg)
+ throws AlgebricksException {
+ mapVariablesStandard(op, arg);
+ return null;
+ }
+
+ @Override
public Void visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 9b2f5a0..8f1d686 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -29,8 +29,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -46,7 +48,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -240,6 +241,12 @@
}
@Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext arg)
+ throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext arg) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 994c6cb..31adcba 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -32,8 +32,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -49,7 +51,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -223,6 +224,11 @@
}
@Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
return null;
}
@@ -249,6 +255,7 @@
@Override
public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ op.getDelegate().getProducedVariables(producedVariables);
return null;
}
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 9295179..cd0cee3 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -31,8 +31,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -48,7 +50,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -85,7 +86,13 @@
@Override
public Void visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
- standardLayout(op);
+ for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
+ ILogicalExpression expr = exprRef.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr;
+ schemaVariables.add(varRefExpr.getVariableReference());
+ }
+ }
return null;
}
@@ -232,6 +239,12 @@
}
@Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) throws AlgebricksException {
+ standardLayout(op);
+ return null;
+ }
+
+ @Override
public Void visitWriteResultOperator(WriteResultOperator op, Void arg) throws AlgebricksException {
standardLayout(op);
return null;
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 11e56ca..69fb3f8 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -33,8 +33,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -51,7 +53,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -335,6 +336,16 @@
}
@Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> e : op.getExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
public Void visitWriteResultOperator(WriteResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 3a82ccd..5361a19 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -25,14 +25,17 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -49,13 +52,17 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> {
@@ -106,8 +113,49 @@
}
@Override
- public Void visitExchangeOperator(ExchangeOperator op, Void arg) {
- // does not use any variable
+ public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ // Used variables depend on the physical operator.
+ if (op.getPhysicalOperator() != null) {
+ IPhysicalOperator physOp = op.getPhysicalOperator();
+ switch (physOp.getOperatorTag()) {
+ case BROADCAST_EXCHANGE:
+ case ONE_TO_ONE_EXCHANGE:
+ case RANDOM_MERGE_EXCHANGE: {
+ // No variables used.
+ break;
+ }
+ case HASH_PARTITION_EXCHANGE: {
+ HashPartitionExchangePOperator concreteOp = (HashPartitionExchangePOperator) physOp;
+ usedVariables.addAll(concreteOp.getHashFields());
+ break;
+ }
+ case HASH_PARTITION_MERGE_EXCHANGE: {
+ HashPartitionMergeExchangePOperator concreteOp = (HashPartitionMergeExchangePOperator) physOp;
+ usedVariables.addAll(concreteOp.getPartitionFields());
+ for (OrderColumn orderCol : concreteOp.getOrderColumns()) {
+ usedVariables.add(orderCol.getColumn());
+ }
+ break;
+ }
+ case SORT_MERGE_EXCHANGE: {
+ SortMergeExchangePOperator concreteOp = (SortMergeExchangePOperator) physOp;
+ for (OrderColumn orderCol : concreteOp.getSortColumns()) {
+ usedVariables.add(orderCol.getColumn());
+ }
+ break;
+ }
+ case RANGE_PARTITION_EXCHANGE: {
+ RangePartitionPOperator concreteOp = (RangePartitionPOperator) physOp;
+ for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+ usedVariables.add(partCol.getColumn());
+ }
+ break;
+ }
+ default: {
+ throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
+ }
+ }
+ }
return null;
}
@@ -257,6 +305,14 @@
}
@Override
+ public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
+ expr.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
public Void visitWriteResultOperator(WriteResultOperator op, Void arg) {
op.getPayloadExpression().getValue().getUsedVariables(usedVariables);
for (Mutable<ILogicalExpression> e : op.getKeyExpressions()) {
@@ -297,6 +353,7 @@
@Override
public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ op.getDelegate().getUsedVariables(usedVariables);
return null;
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 25f22e7..f66f99b 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -61,14 +61,22 @@
ITypingContext ctx) throws AlgebricksException {
substituteVariables(op, v1, v2, true, ctx);
}
-
+
+ public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,
+ LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {
+ for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
+ substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);
+ }
+ substituteVariables(op, v1, v2, true, ctx);
+ }
+
public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(
goThroughNts, ctx);
op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));
}
-
+
public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {
Set<T> varSet = new HashSet<T>();
Set<T> varArgSet = new HashSet<T>();
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
new file mode 100644
index 0000000..302d4d2
--- /dev/null
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class DistributeResultPOperator extends AbstractPhysicalOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.DISTRIBUTE_RESULT;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ ILogicalOperator op2 = op.getInputs().get(0).getValue();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ DistributeResultOperator write = (DistributeResultOperator) op;
+ IDataSink sink = write.getDataSink();
+ IPartitioningProperty pp = sink.getPartitioningProperty();
+ StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(pp, null) };
+ return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ DistributeResultOperator resultOp = (DistributeResultOperator) op;
+ IMetadataProvider mp = context.getMetadataProvider();
+
+ JobSpecification spec = builder.getJobSpec();
+
+ int[] columns = new int[resultOp.getExpressions().size()];
+ int i = 0;
+ for (Mutable<ILogicalExpression> exprRef : resultOp.getExpressions()) {
+ ILogicalExpression expr = exprRef.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new NotImplementedException("Only writing variable expressions is supported.");
+ }
+ VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+ LogicalVariable v = varRef.getVariableReference();
+ columns[i++] = inputSchemas[0].findVariable(v);
+ }
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+
+ IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op),
+ context, columns);
+
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
+ resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
+
+ builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = resultOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, resultOp, 0);
+ }
+}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
index f3c9e5a..61d4880 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HashPartitionMergeExchangePOperator.java
@@ -158,5 +158,13 @@
comparatorFactories);
return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
}
+
+ public List<LogicalVariable> getPartitionFields() {
+ return partitionFields;
+ }
+
+ public List<OrderColumn> getOrderColumns() {
+ return orderColumns;
+ }
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index c737cc4..6da42b4 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -32,14 +32,21 @@
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
@@ -90,6 +97,8 @@
IVariableTypeEnvironment env = context.getTypeEnvironment(op);
IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
keysLeftBranch, env, context);
+ IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(
+ keysLeftBranch, env, context);
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
int i = 0;
IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
@@ -97,33 +106,75 @@
Object t = env.getVarType(v);
comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
}
- RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
IOperatorDescriptorRegistry spec = builder.getJobSpec();
IOperatorDescriptor opDesc = null;
- try {
- switch (kind) {
- case INNER: {
- opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
- maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
- hashFunFactories, comparatorFactories, recDescriptor);
- break;
- }
- case LEFT_OUTER: {
- INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
- for (int j = 0; j < nullWriterFactories.length; j++) {
- nullWriterFactories[j] = context.getNullWriterFactory();
- }
- opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
- maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
- hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
- break;
- }
- default: {
- throw new NotImplementedException();
- }
+
+ boolean optimizedHashJoin = true;
+ for (IBinaryHashFunctionFamily family : hashFunFamilies) {
+ if (family == null) {
+ optimizedHashJoin = false;
+ break;
}
- } catch (HyracksDataException e) {
- throw new AlgebricksException(e);
+ }
+
+ if (!optimizedHashJoin) {
+ try {
+ switch (kind) {
+ case INNER: {
+ opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+ hashFunFactories, comparatorFactories, recDescriptor);
+ break;
+ }
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new HybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, aveRecordsPerFrame, getFudgeFactor(), keysLeft, keysRight,
+ hashFunFactories, comparatorFactories, recDescriptor, true, nullWriterFactories);
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
+ } else {
+ try {
+ switch (kind) {
+ case INNER: {
+ opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+ comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+ keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+ keysRight, keysLeft));
+ break;
+ }
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
+ maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
+ comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
+ keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
+ keysRight, keysLeft), true, nullWriterFactories);
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw new AlgebricksException(e);
+ }
}
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
@@ -140,3 +191,72 @@
}
}
+
+/**
+ * {@ ITuplePairComparatorFactory} implementation for optimized hybrid hash join.
+ */
+class JoinMultiComparatorFactory implements ITuplePairComparatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ private final IBinaryComparatorFactory[] binaryComparatorFactories;
+ private final int[] keysLeft;
+ private final int[] keysRight;
+
+ public JoinMultiComparatorFactory(IBinaryComparatorFactory[] binaryComparatorFactory, int[] keysLeft,
+ int[] keysRight) {
+ this.binaryComparatorFactories = binaryComparatorFactory;
+ this.keysLeft = keysLeft;
+ this.keysRight = keysRight;
+ }
+
+ @Override
+ public ITuplePairComparator createTuplePairComparator(IHyracksTaskContext ctx) {
+ IBinaryComparator[] binaryComparators = new IBinaryComparator[binaryComparatorFactories.length];
+ for (int i = 0; i < binaryComparators.length; i++) {
+ binaryComparators[i] = binaryComparatorFactories[i].createBinaryComparator();
+ }
+ return new JoinMultiComparator(binaryComparators, keysLeft, keysRight);
+ }
+}
+
+/**
+ * {@ ITuplePairComparator} implementation for optimized hybrid hash join.
+ * The comparator applies multiple binary comparators, one for each key pairs
+ */
+class JoinMultiComparator implements ITuplePairComparator {
+ private final IBinaryComparator[] binaryComparators;
+ private final int[] keysLeft;
+ private final int[] keysRight;
+
+ public JoinMultiComparator(IBinaryComparator[] bComparator, int[] keysLeft, int[] keysRight) {
+ this.binaryComparators = bComparator;
+ this.keysLeft = keysLeft;
+ this.keysRight = keysRight;
+ }
+
+ @Override
+ public int compare(IFrameTupleAccessor accessor0, int tIndex0, IFrameTupleAccessor accessor1, int tIndex1) {
+ int tStart0 = accessor0.getTupleStartOffset(tIndex0);
+ int fStartOffset0 = accessor0.getFieldSlotsLength() + tStart0;
+
+ int tStart1 = accessor1.getTupleStartOffset(tIndex1);
+ int fStartOffset1 = accessor1.getFieldSlotsLength() + tStart1;
+
+ for (int i = 0; i < binaryComparators.length; i++) {
+ int fStart0 = accessor0.getFieldStartOffset(tIndex0, keysLeft[i]);
+ int fEnd0 = accessor0.getFieldEndOffset(tIndex0, keysLeft[i]);
+ int fLen0 = fEnd0 - fStart0;
+
+ int fStart1 = accessor1.getFieldStartOffset(tIndex1, keysRight[i]);
+ int fEnd1 = accessor1.getFieldEndOffset(tIndex1, keysRight[i]);
+ int fLen1 = fEnd1 - fStart1;
+
+ int c = binaryComparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0,
+ accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+ if (c != 0) {
+ return c;
+ }
+ }
+ return 0;
+ }
+}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index 8cbd2d8..d153f90 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -44,6 +44,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -56,8 +57,8 @@
import edu.uci.ics.hyracks.dataflow.std.join.NestedLoopJoinOperatorDescriptor;
/**
- * Left input is broadcast and preserves its local properties.
- * Right input can be partitioned in any way.
+ * Left input is broadcast and preserves its local properties. Right input can
+ * be partitioned in any way.
*/
public class NLJoinPOperator extends AbstractJoinPOperator {
@@ -97,7 +98,7 @@
pp = pv1.getPartitioningProperty();
}
} else {
- pp = IPartitioningProperty.UNPARTITIONED;
+ pp = IPartitioningProperty.UNPARTITIONED;
}
List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
@@ -122,7 +123,8 @@
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
- RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
conditionInputSchemas[0] = propagatedSchema;
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
@@ -135,10 +137,19 @@
switch (kind) {
case INNER: {
- opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize);
+ opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, false,
+ null);
break;
}
- case LEFT_OUTER:
+ case LEFT_OUTER: {
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[inputSchemas[1].getSize()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ opDesc = new NestedLoopJoinOperatorDescriptor(spec, comparatorFactory, recDescriptor, memSize, true,
+ nullWriterFactories);
+ break;
+ }
default: {
throw new NotImplementedException();
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index 7e3c935..8875f6c 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
+import java.util.List;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -69,5 +70,9 @@
ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
throw new NotImplementedException();
}
+
+ public List<OrderColumn> getPartitioningFields() {
+ return partitioningFields;
+ }
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index a94c78e..fc0c433 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -30,8 +30,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -48,7 +50,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -164,6 +165,13 @@
}
@Override
+ public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) {
+ StringBuilder buffer = new StringBuilder();
+ addIndent(buffer, indent).append("distribute result ").append(op.getExpressions());
+ return buffer.toString();
+ }
+
+ @Override
public String visitWriteResultOperator(WriteResultOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("load ").append(op.getDataSource()).append(" from ")
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 6b5949e..23dac2a 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -20,8 +20,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -37,7 +39,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -97,6 +98,8 @@
public R visitWriteOperator(WriteOperator op, T arg) throws AlgebricksException;
+ public R visitDistributeResultOperator(DistributeResultOperator op, T arg) throws AlgebricksException;
+
public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 22a1a81..365d1a5 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -42,151 +43,167 @@
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
public class JobGenContext {
- private final IOperatorSchema outerFlowSchema;
- private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
- private final ISerializerDeserializerProvider serializerDeserializerProvider;
- private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
- private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
- private final IPrinterFactoryProvider printerFactoryProvider;
- private final ITypeTraitProvider typeTraitProvider;
- private final IMetadataProvider<?, ?> metadataProvider;
- private final INullWriterFactory nullWriterFactory;
- private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
- private final Object appContext;
- private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
- private final IBinaryIntegerInspectorFactory integerInspectorFactory;
- private final IExpressionRuntimeProvider expressionRuntimeProvider;
- private final IExpressionTypeComputer expressionTypeComputer;
- private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
- private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
- private final int frameSize;
- private AlgebricksPartitionConstraint clusterLocations;
- private int varCounter;
- private final ITypingContext typingContext;
+ private final IOperatorSchema outerFlowSchema;
+ private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
+ private final ISerializerDeserializerProvider serializerDeserializerProvider;
+ private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+ private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
+ private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
+ private final IPrinterFactoryProvider printerFactoryProvider;
+ private final ITypeTraitProvider typeTraitProvider;
+ private final IMetadataProvider<?, ?> metadataProvider;
+ private final INullWriterFactory nullWriterFactory;
+ private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
+ private final Object appContext;
+ private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
+ private final IBinaryIntegerInspectorFactory integerInspectorFactory;
+ private final IExpressionRuntimeProvider expressionRuntimeProvider;
+ private final IExpressionTypeComputer expressionTypeComputer;
+ private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+ private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+ private final int frameSize;
+ private AlgebricksPartitionConstraint clusterLocations;
+ private int varCounter;
+ private final ITypingContext typingContext;
- public JobGenContext(IOperatorSchema outerFlowSchema, IMetadataProvider<?, ?> metadataProvider, Object appContext,
- ISerializerDeserializerProvider serializerDeserializerProvider,
- IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
- IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
- IBinaryBooleanInspectorFactory booleanInspectorFactory,
- IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
- INullWriterFactory nullWriterFactory,
- INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
- IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
- INullableTypeComputer nullableTypeComputer, ITypingContext typingContext,
- IExpressionEvalSizeComputer expressionEvalSizeComputer,
- IPartialAggregationTypeComputer partialAggregationTypeComputer, int frameSize,
- AlgebricksPartitionConstraint clusterLocations) {
- this.outerFlowSchema = outerFlowSchema;
- this.metadataProvider = metadataProvider;
- this.appContext = appContext;
- this.serializerDeserializerProvider = serializerDeserializerProvider;
- this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
- this.comparatorFactoryProvider = comparatorFactoryProvider;
- this.typeTraitProvider = typeTraitProvider;
- this.booleanInspectorFactory = booleanInspectorFactory;
- this.integerInspectorFactory = integerInspectorFactory;
- this.printerFactoryProvider = printerFactoryProvider;
- this.clusterLocations = clusterLocations;
- this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
- this.nullWriterFactory = nullWriterFactory;
- this.expressionRuntimeProvider = expressionRuntimeProvider;
- this.expressionTypeComputer = expressionTypeComputer;
- this.typingContext = typingContext;
- this.expressionEvalSizeComputer = expressionEvalSizeComputer;
- this.partialAggregationTypeComputer = partialAggregationTypeComputer;
- this.frameSize = frameSize;
- this.varCounter = 0;
- }
+ public JobGenContext(
+ IOperatorSchema outerFlowSchema,
+ IMetadataProvider<?, ?> metadataProvider,
+ Object appContext,
+ ISerializerDeserializerProvider serializerDeserializerProvider,
+ IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
+ IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
+ IBinaryComparatorFactoryProvider comparatorFactoryProvider,
+ ITypeTraitProvider typeTraitProvider,
+ IBinaryBooleanInspectorFactory booleanInspectorFactory,
+ IBinaryIntegerInspectorFactory integerInspectorFactory,
+ IPrinterFactoryProvider printerFactoryProvider,
+ INullWriterFactory nullWriterFactory,
+ INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
+ IExpressionRuntimeProvider expressionRuntimeProvider,
+ IExpressionTypeComputer expressionTypeComputer,
+ INullableTypeComputer nullableTypeComputer,
+ ITypingContext typingContext,
+ IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ IPartialAggregationTypeComputer partialAggregationTypeComputer,
+ int frameSize, AlgebricksPartitionConstraint clusterLocations) {
+ this.outerFlowSchema = outerFlowSchema;
+ this.metadataProvider = metadataProvider;
+ this.appContext = appContext;
+ this.serializerDeserializerProvider = serializerDeserializerProvider;
+ this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
+ this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+ this.comparatorFactoryProvider = comparatorFactoryProvider;
+ this.typeTraitProvider = typeTraitProvider;
+ this.booleanInspectorFactory = booleanInspectorFactory;
+ this.integerInspectorFactory = integerInspectorFactory;
+ this.printerFactoryProvider = printerFactoryProvider;
+ this.clusterLocations = clusterLocations;
+ this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
+ this.nullWriterFactory = nullWriterFactory;
+ this.expressionRuntimeProvider = expressionRuntimeProvider;
+ this.expressionTypeComputer = expressionTypeComputer;
+ this.typingContext = typingContext;
+ this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+ this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+ this.frameSize = frameSize;
+ this.varCounter = 0;
+ }
- public IOperatorSchema getOuterFlowSchema() {
- return outerFlowSchema;
- }
+ public IOperatorSchema getOuterFlowSchema() {
+ return outerFlowSchema;
+ }
- public AlgebricksPartitionConstraint getClusterLocations() {
- return clusterLocations;
- }
+ public AlgebricksPartitionConstraint getClusterLocations() {
+ return clusterLocations;
+ }
- public IMetadataProvider<?, ?> getMetadataProvider() {
- return metadataProvider;
- }
+ public IMetadataProvider<?, ?> getMetadataProvider() {
+ return metadataProvider;
+ }
- public Object getAppContext() {
- return appContext;
- }
+ public Object getAppContext() {
+ return appContext;
+ }
- public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
- return serializerDeserializerProvider;
- }
+ public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
+ return serializerDeserializerProvider;
+ }
- public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
- return hashFunctionFactoryProvider;
- }
+ public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
+ return hashFunctionFactoryProvider;
+ }
- public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
- return comparatorFactoryProvider;
- }
+ public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
+ return hashFunctionFamilyProvider;
+ }
- public ITypeTraitProvider getTypeTraitProvider() {
- return typeTraitProvider;
- }
+ public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
+ return comparatorFactoryProvider;
+ }
- public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
- return booleanInspectorFactory;
- }
+ public ITypeTraitProvider getTypeTraitProvider() {
+ return typeTraitProvider;
+ }
- public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
- return integerInspectorFactory;
- }
+ public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+ return booleanInspectorFactory;
+ }
- public IPrinterFactoryProvider getPrinterFactoryProvider() {
- return printerFactoryProvider;
- }
+ public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+ return integerInspectorFactory;
+ }
- public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
- return expressionRuntimeProvider;
- }
+ public IPrinterFactoryProvider getPrinterFactoryProvider() {
+ return printerFactoryProvider;
+ }
- public IOperatorSchema getSchema(ILogicalOperator op) {
- return schemaMap.get(op);
- }
+ public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
+ return expressionRuntimeProvider;
+ }
- public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
- schemaMap.put(op, schema);
- }
+ public IOperatorSchema getSchema(ILogicalOperator op) {
+ return schemaMap.get(op);
+ }
- public LogicalVariable createNewVar() {
- varCounter++;
- LogicalVariable var = new LogicalVariable(-varCounter);
- return var;
- }
+ public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
+ schemaMap.put(op, schema);
+ }
- public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env) throws AlgebricksException {
- return expressionTypeComputer.getType(expr, typingContext.getMetadataProvider(), env);
- }
+ public LogicalVariable createNewVar() {
+ varCounter++;
+ LogicalVariable var = new LogicalVariable(-varCounter);
+ return var;
+ }
- public INullWriterFactory getNullWriterFactory() {
- return nullWriterFactory;
- }
+ public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env)
+ throws AlgebricksException {
+ return expressionTypeComputer.getType(expr,
+ typingContext.getMetadataProvider(), env);
+ }
- public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
- return normalizedKeyComputerFactoryProvider;
- }
+ public INullWriterFactory getNullWriterFactory() {
+ return nullWriterFactory;
+ }
- public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
- return expressionEvalSizeComputer;
- }
+ public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
+ return normalizedKeyComputerFactoryProvider;
+ }
- public int getFrameSize() {
- return frameSize;
- }
+ public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+ return expressionEvalSizeComputer;
+ }
- public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
- return partialAggregationTypeComputer;
- }
+ public int getFrameSize() {
+ return frameSize;
+ }
- public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
- return typingContext.getOutputTypeEnvironment(op);
- }
+ public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
+ return partialAggregationTypeComputer;
+ }
+
+ public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
+ return typingContext.getOutputTypeEnvironment(op);
+ }
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
index 790fb93..530d19c 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
@@ -31,6 +32,7 @@
import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
@@ -42,8 +44,8 @@
@SuppressWarnings("rawtypes")
public static RecordDescriptor mkRecordDescriptor(IVariableTypeEnvironment env, IOperatorSchema opSchema,
- JobGenContext context) throws AlgebricksException {
- ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
+ JobGenContext context) throws AlgebricksException {
+ ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
ITypeTraits[] typeTraits = new ITypeTraits[opSchema.getSize()];
ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();
ITypeTraitProvider ttp = context.getTypeTraitProvider();
@@ -59,7 +61,7 @@
}
return new RecordDescriptor(fields, typeTraits);
}
-
+
public static IPrinterFactory[] mkPrinterFactories(IOperatorSchema opSchema, IVariableTypeEnvironment env,
JobGenContext context, int[] printColumns) throws AlgebricksException {
IPrinterFactory[] pf = new IPrinterFactory[printColumns.length];
@@ -94,7 +96,20 @@
}
return funFactories;
}
-
+
+ public static IBinaryHashFunctionFamily[] variablesToBinaryHashFunctionFamilies(
+ Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+ throws AlgebricksException {
+ IBinaryHashFunctionFamily[] funFamilies = new IBinaryHashFunctionFamily[varLogical.size()];
+ int i = 0;
+ IBinaryHashFunctionFamilyProvider bhffProvider = context.getBinaryHashFunctionFamilyProvider();
+ for (LogicalVariable var : varLogical) {
+ Object type = env.getVarType(var);
+ funFamilies[i++] = bhffProvider.getBinaryHashFunctionFamily(type);
+ }
+ return funFamilies;
+ }
+
public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
throws AlgebricksException {
@@ -107,15 +122,14 @@
}
return compFactories;
}
-
- public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
- List<LogicalVariable> varLogical, int start, int size, IVariableTypeEnvironment env, JobGenContext context)
- throws AlgebricksException {
+
+ public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(List<LogicalVariable> varLogical,
+ int start, int size, IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[size];
IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
for (int i = 0; i < size; i++) {
- Object type = env.getVarType(varLogical.get(start + i));
- compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
+ Object type = env.getVarType(varLogical.get(start + i));
+ compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
}
return compFactories;
}
@@ -144,15 +158,14 @@
}
return typeTraits;
}
-
- public static ITypeTraits[] variablesToTypeTraits(
- List<LogicalVariable> varLogical, int start, int size, IVariableTypeEnvironment env, JobGenContext context)
- throws AlgebricksException {
+
+ public static ITypeTraits[] variablesToTypeTraits(List<LogicalVariable> varLogical, int start, int size,
+ IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
ITypeTraits[] typeTraits = new ITypeTraits[size];
ITypeTraitProvider typeTraitProvider = context.getTypeTraitProvider();
for (int i = 0; i < size; i++) {
- Object type = env.getVarType(varLogical.get(start + i));
- typeTraits[i] = typeTraitProvider.getTypeTrait(type);
+ Object type = env.getVarType(varLogical.get(start + i));
+ typeTraits[i] = typeTraitProvider.getTypeTrait(type);
}
return typeTraits;
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index bbe8fb3..63a6852 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -54,6 +54,8 @@
operatorVisitedToParents.clear();
builder.buildSpec(rootOps);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+ // Do not do activity cluster planning because it is slow on large clusters
+ spec.setUseConnectorPolicyForScheduling(false);
return spec;
}
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 7c63e01..738fc7f 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -135,6 +135,7 @@
/*
* returns true if op1 and op2 have already been compared
*/
+ @Override
public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2) {
HashSet<ILogicalOperator> ops = alreadyCompared.get(op1);
if (ops == null) {
@@ -151,6 +152,11 @@
}
}
}
+
+ @Override
+ public void removeFromAlreadyCompared(ILogicalOperator op1) {
+ alreadyCompared.remove(op1);
+ }
public void addNotToBeInlinedVar(LogicalVariable var) {
notToBeInlinedVars.add(var);
diff --git a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index 9ce910b..fc6c198 100644
--- a/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/fullstack/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -17,8 +17,8 @@
public PhysicalOptimizationConfig() {
int frameSize = 32768;
setInt(FRAMESIZE, frameSize);
- setInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 512 * MB) / frameSize));
- setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 256 * MB) / frameSize));
+ setInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 32 * MB) / frameSize));
+ setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 32 * MB) / frameSize));
// use http://www.rsok.com/~jrm/printprimes.html to find prime numbers
setInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
diff --git a/fullstack/algebricks/algebricks-data/pom.xml b/fullstack/algebricks/algebricks-data/pom.xml
index 3d927d9..9536416 100644
--- a/fullstack/algebricks/algebricks-data/pom.xml
+++ b/fullstack/algebricks/algebricks-data/pom.xml
@@ -16,8 +16,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/fullstack/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryHashFunctionFamilyProvider.java b/fullstack/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryHashFunctionFamilyProvider.java
new file mode 100644
index 0000000..8a992b3
--- /dev/null
+++ b/fullstack/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IBinaryHashFunctionFamilyProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.data;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+
+public interface IBinaryHashFunctionFamilyProvider {
+
+ public IBinaryHashFunctionFamily getBinaryHashFunctionFamily(Object type)
+ throws AlgebricksException;
+}
diff --git a/fullstack/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java b/fullstack/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
new file mode 100644
index 0000000..5e62612
--- /dev/null
+++ b/fullstack/algebricks/algebricks-data/src/main/java/edu/uci/ics/hyracks/algebricks/data/IResultSerializerFactoryProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.data;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+
+public interface IResultSerializerFactoryProvider extends Serializable {
+ /**
+ * Returns a result serializer factory
+ *
+ * @param fields
+ * - A position of the fields in the order it should be written in the output.
+ * @param printerFactories
+ * - A printer factory array to print the tuple containing different fields.
+ * @param writerFactory
+ * - A writer factory to write the serialized data to the print stream.
+ * @param inputRecordDesc
+ * - The record descriptor describing the input frame to be serialized.
+ * @return A new instance of result serialized appender.
+ */
+ public IResultSerializerFactory getAqlResultSerializerFactoryProvider(int[] fields,
+ IPrinterFactory[] printerFactories, IAWriterFactory writerFactory);
+}
diff --git a/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml b/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml
index 954938a..ca7467b 100644
--- a/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/fullstack/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -16,8 +16,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
<plugin>
diff --git a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
index d105759..2981157 100644
--- a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
+++ b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
@@ -127,9 +127,9 @@
}
});
builder.setTypeTraitProvider(new ITypeTraitProvider() {
- public ITypeTraits getTypeTrait(Object type) {
- return null;
- }
+ public ITypeTraits getTypeTrait(Object type) {
+ return null;
+ }
});
builder.setPrinterProvider(PigletPrinterFactoryProvider.INSTANCE);
builder.setExpressionRuntimeProvider(new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(
diff --git a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index d678803..15b290e 100644
--- a/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -145,6 +145,13 @@
}
@Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
+ int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
+ JobSpecification spec) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
IDataSource<String> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payLoadVar, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
diff --git a/fullstack/algebricks/algebricks-rewriter/pom.xml b/fullstack/algebricks/algebricks-rewriter/pom.xml
index 41979d3..7968773 100644
--- a/fullstack/algebricks/algebricks-rewriter/pom.xml
+++ b/fullstack/algebricks/algebricks-rewriter/pom.xml
@@ -16,8 +16,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
index 4f6699a..3daf85d 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexJoinInferenceRule.java
@@ -69,7 +69,7 @@
VariableUtilities.getUsedVariables(op, varsUsedInUnnest);
HashSet<LogicalVariable> producedInSubplan = new HashSet<LogicalVariable>();
- VariableUtilities.getLiveVariables(subplan, producedInSubplan);
+ VariableUtilities.getProducedVariables(subplan, producedInSubplan);
if (!producedInSubplan.containsAll(varsUsedInUnnest)) {
return false;
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
index 4521d1a..fa5000e 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
@@ -149,9 +149,15 @@
// Trivially joinable.
return true;
}
- if (!belowSecondUnnest && op.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
- // Bail on subplan.
- return false;
+ if (!belowSecondUnnest) {
+ // Bail on the following operators.
+ switch (op.getOperatorTag()) {
+ case AGGREGATE:
+ case SUBPLAN:
+ case GROUP:
+ case UNNEST_MAP:
+ return false;
+ }
}
switch (op.getOperatorTag()) {
case UNNEST:
@@ -211,7 +217,8 @@
for (LogicalVariable producedVar : producedVars) {
if (outerUsedVars.contains(producedVar)) {
outerMatches++;
- } else if (innerUsedVars.contains(producedVar)) {
+ }
+ if (innerUsedVars.contains(producedVar)) {
innerMatches++;
}
}
@@ -221,24 +228,30 @@
// All produced vars used by outer partition.
outerOps.add(op);
targetUsedVars = outerUsedVars;
- } else if (innerMatches == producedVars.size() && !producedVars.isEmpty()) {
+ }
+ if (innerMatches == producedVars.size() && !producedVars.isEmpty()) {
// All produced vars used by inner partition.
innerOps.add(op);
targetUsedVars = innerUsedVars;
- } else if (innerMatches == 0 && outerMatches == 0) {
+ }
+ if (innerMatches == 0 && outerMatches == 0) {
// Op produces variables that are not used in the part of the plan we've seen (or it doesn't produce any vars).
// Try to figure out where it belongs by analyzing the used variables.
List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
VariableUtilities.getUsedVariables(op, usedVars);
for (LogicalVariable usedVar : usedVars) {
+ boolean canBreak = false;
if (outerUsedVars.contains(usedVar)) {
outerOps.add(op);
targetUsedVars = outerUsedVars;
- break;
+ canBreak = true;
}
if (innerUsedVars.contains(usedVar)) {
innerOps.add(op);
targetUsedVars = innerUsedVars;
+ canBreak = true;
+ }
+ if (canBreak) {
break;
}
}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
new file mode 100644
index 0000000..f017e0f
--- /dev/null
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -0,0 +1,439 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Factors out common sub-expressions by assigning them to a variables, and replacing the common sub-expressions with references to those variables.
+ *
+ * Preconditions/Assumptions:
+ * Assumes no projects are in the plan. This rule ignores variable reference expressions and constants (other rules deal with those separately).
+ *
+ * Postconditions/Examples:
+ * Plan with extracted sub-expressions. Generates one assign operator per extracted expression.
+ *
+ * Example 1 - Simple Arithmetic Example (simplified)
+ *
+ * Before plan:
+ * assign [$$1] <- [5 + 6 - 10]
+ * assign [$$0] <- [5 + 6 + 30]
+ *
+ * After plan:
+ * assign [$$1] <- [$$5 - 10]
+ * assign [$$0] <- [$$5 + 30]
+ * assign [$$5] <- [5 + 6]
+ *
+ * Example 2 - Cleaning up 'Distinct By' (simplified)
+ *
+ * Before plan: (notice how $$0 is not live after the distinct)
+ * assign [$$3] <- [field-access($$0, 1)]
+ * distinct ([%0->$$5])
+ * assign [$$5] <- [field-access($$0, 1)]
+ * unnest $$0 <- [scan-dataset]
+ *
+ * After plan: (notice how the issue of $$0 is fixed)
+ * assign [$$3] <- [$$5]
+ * distinct ([$$5])
+ * assign [$$5] <- [field-access($$0, 1)]
+ * unnest $$0 <- [scan-dataset]
+ *
+ * Example 3 - Pulling Common Expressions Above Joins (simplified)
+ *
+ * Before plan:
+ * assign [$$9] <- funcZ(funcY($$8))
+ * join (funcX(funcY($$8)))
+ *
+ * After plan:
+ * assign [$$9] <- funcZ($$10))
+ * select (funcX($$10))
+ * assign [$$10] <- [funcY($$8)]
+ * join (TRUE)
+ */
+public class ExtractCommonExpressionsRule implements IAlgebraicRewriteRule {
+
+ private final List<ILogicalExpression> originalAssignExprs = new ArrayList<ILogicalExpression>();
+
+ private final CommonExpressionSubstitutionVisitor substVisitor = new CommonExpressionSubstitutionVisitor();
+ private final Map<ILogicalExpression, ExprEquivalenceClass> exprEqClassMap = new HashMap<ILogicalExpression, ExprEquivalenceClass>();
+
+ // Set of operators for which common subexpression elimination should not be performed.
+ private static final Set<LogicalOperatorTag> ignoreOps = new HashSet<LogicalOperatorTag>();
+ static {
+ ignoreOps.add(LogicalOperatorTag.UNNEST);
+ ignoreOps.add(LogicalOperatorTag.UNNEST_MAP);
+ ignoreOps.add(LogicalOperatorTag.ORDER);
+ ignoreOps.add(LogicalOperatorTag.PROJECT);
+ ignoreOps.add(LogicalOperatorTag.AGGREGATE);
+ ignoreOps.add(LogicalOperatorTag.RUNNINGAGGREGATE);
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ exprEqClassMap.clear();
+ substVisitor.setContext(context);
+ boolean modified = removeCommonExpressions(opRef, context);
+ if (modified) {
+ context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());
+ }
+ return modified;
+ }
+
+ private void updateEquivalenceClassMap(LogicalVariable lhs, Mutable<ILogicalExpression> rhsExprRef, ILogicalExpression rhsExpr, ILogicalOperator op) {
+ ExprEquivalenceClass exprEqClass = exprEqClassMap.get(rhsExpr);
+ if (exprEqClass == null) {
+ exprEqClass = new ExprEquivalenceClass(op, rhsExprRef);
+ exprEqClassMap.put(rhsExpr, exprEqClass);
+ }
+ exprEqClass.setVariable(lhs);
+ }
+
+ private boolean removeCommonExpressions(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (context.checkIfInDontApplySet(this, opRef.getValue())) {
+ return false;
+ }
+
+ boolean modified = false;
+ // Recurse into children.
+ for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+ if (removeCommonExpressions(inputOpRef, context)) {
+ modified = true;
+ }
+ }
+
+ // TODO: Deal with replicate properly. Currently, we just clear the expr equivalence map, since we want to avoid incorrect expression replacement
+ // (the resulting new variables should be assigned live below a replicate).
+ if (op.getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+ exprEqClassMap.clear();
+ return modified;
+ }
+ // Exclude these operators.
+ if (ignoreOps.contains(op.getOperatorTag())) {
+ return modified;
+ }
+
+ // Remember a copy of the original assign expressions, so we can add them to the equivalence class map
+ // after replacing expressions within the assign operator itself.
+ if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+ AssignOperator assignOp = (AssignOperator) op;
+ originalAssignExprs.clear();
+ int numVars = assignOp.getVariables().size();
+ for (int i = 0; i < numVars; i++) {
+ Mutable<ILogicalExpression> exprRef = assignOp.getExpressions().get(i);
+ ILogicalExpression expr = exprRef.getValue();
+ originalAssignExprs.add(expr.cloneExpression());
+ }
+ }
+
+ // Perform common subexpression elimination.
+ substVisitor.setOperator(op);
+ if (op.acceptExpressionTransform(substVisitor)) {
+ modified = true;
+ }
+
+ // Update equivalence class map.
+ if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+ AssignOperator assignOp = (AssignOperator) op;
+ int numVars = assignOp.getVariables().size();
+ for (int i = 0; i < numVars; i++) {
+ Mutable<ILogicalExpression> exprRef = assignOp.getExpressions().get(i);
+ ILogicalExpression expr = exprRef.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE
+ || expr.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+ continue;
+ }
+ // Update equivalence class map.
+ LogicalVariable lhs = assignOp.getVariables().get(i);
+ updateEquivalenceClassMap(lhs, exprRef, exprRef.getValue(), op);
+
+ // Update equivalence class map with original assign expression.
+ updateEquivalenceClassMap(lhs, exprRef, originalAssignExprs.get(i), op);
+ }
+ }
+
+ // TODO: For now do not perform replacement in nested plans
+ // due to the complication of figuring out whether the firstOp in an equivalence class is within a subplan,
+ // and the resulting variable will not be visible to the outside.
+ // Since subplans should be eliminated in most cases, this behavior is acceptable for now.
+ /*
+ if (op.hasNestedPlans()) {
+ AbstractOperatorWithNestedPlans opWithNestedPlan = (AbstractOperatorWithNestedPlans) op;
+ for (ILogicalPlan nestedPlan : opWithNestedPlan.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) {
+ if (removeCommonExpressions(rootRef, context)) {
+ modified = true;
+ }
+ }
+ }
+ }
+ */
+
+ if (modified) {
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ context.addToDontApplySet(this, op);
+ }
+ return modified;
+ }
+
+ private class CommonExpressionSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
+
+ private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+ private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+ private IOptimizationContext context;
+ private ILogicalOperator op;
+
+ public void setContext(IOptimizationContext context) {
+ this.context = context;
+ }
+
+ public void setOperator(ILogicalOperator op) throws AlgebricksException {
+ this.op = op;
+ liveVars.clear();
+ usedVars.clear();
+ }
+
+ @Override
+ public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+ if (liveVars.isEmpty() && usedVars.isEmpty()) {
+ VariableUtilities.getLiveVariables(op, liveVars);
+ VariableUtilities.getUsedVariables(op, usedVars);
+ }
+
+ AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
+ boolean modified = false;
+ ExprEquivalenceClass exprEqClass = exprEqClassMap.get(expr);
+ if (exprEqClass != null) {
+ // Replace common subexpression with existing variable.
+ if (exprEqClass.variableIsSet()) {
+ // Check if the replacing variable is live at this op.
+ // However, if the op is already using variables that are not live, then a replacement may enable fixing the plan.
+ // This behavior is necessary to, e.g., properly deal with distinct by.
+ // Also just replace the expr if we are replacing common exprs from within the same operator.
+ if (liveVars.contains(exprEqClass.getVariable()) || !liveVars.containsAll(usedVars)
+ || op == exprEqClass.getFirstOperator()) {
+ exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
+ // Do not descend into children since this expr has been completely replaced.
+ return true;
+ }
+ } else {
+ if (assignCommonExpression(exprEqClass, expr)) {
+ exprRef.setValue(new VariableReferenceExpression(exprEqClass.getVariable()));
+ // Do not descend into children since this expr has been completely replaced.
+ return true;
+ }
+ }
+ } else {
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE
+ && expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ exprEqClass = new ExprEquivalenceClass(op, exprRef);
+ exprEqClassMap.put(expr, exprEqClass);
+ }
+ }
+
+ // Descend into function arguments.
+ if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ for (Mutable<ILogicalExpression> arg : funcExpr.getArguments()) {
+ if (transform(arg)) {
+ modified = true;
+ }
+ }
+ }
+ return modified;
+ }
+
+ private boolean assignCommonExpression(ExprEquivalenceClass exprEqClass, ILogicalExpression expr) throws AlgebricksException {
+ AbstractLogicalOperator firstOp = (AbstractLogicalOperator) exprEqClass.getFirstOperator();
+ Mutable<ILogicalExpression> firstExprRef = exprEqClass.getFirstExpression();
+ if (firstOp.getOperatorTag() == LogicalOperatorTag.INNERJOIN || firstOp.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+ // Do not extract common expressions from within the same join operator.
+ if (firstOp == op) {
+ return false;
+ }
+ AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) firstOp;
+ Mutable<ILogicalExpression> joinCond = joinOp.getCondition();
+ ILogicalExpression enclosingExpr = getEnclosingExpression(joinCond, firstExprRef.getValue());
+ if (enclosingExpr == null) {
+ // No viable enclosing expression that we can pull out from the join.
+ return false;
+ }
+ // Place a Select operator beneath op that contains the enclosing expression.
+ SelectOperator selectOp = new SelectOperator(new MutableObject<ILogicalExpression>(enclosingExpr));
+ selectOp.getInputs().add(new MutableObject<ILogicalOperator>(op.getInputs().get(0).getValue()));
+ op.getInputs().get(0).setValue(selectOp);
+ // Set firstOp to be the select below op, since we want to assign the common subexpr there.
+ firstOp = (AbstractLogicalOperator) selectOp;
+ } else if (firstOp.getInputs().size() > 1) {
+ // Bail for any non-join operator with multiple inputs.
+ return false;
+ }
+ LogicalVariable newVar = context.newVar();
+ AssignOperator newAssign = new AssignOperator(newVar, new MutableObject<ILogicalExpression>(firstExprRef.getValue().cloneExpression()));
+ // Place assign below firstOp.
+ newAssign.getInputs().add(new MutableObject<ILogicalOperator>(firstOp.getInputs().get(0).getValue()));
+ newAssign.setExecutionMode(firstOp.getExecutionMode());
+ firstOp.getInputs().get(0).setValue(newAssign);
+ // Replace original expr with variable reference, and set var in expression equivalence class.
+ firstExprRef.setValue(new VariableReferenceExpression(newVar));
+ exprEqClass.setVariable(newVar);
+ context.computeAndSetTypeEnvironmentForOperator(newAssign);
+ context.computeAndSetTypeEnvironmentForOperator(firstOp);
+ return true;
+ }
+
+ private ILogicalExpression getEnclosingExpression(Mutable<ILogicalExpression> conditionExprRef, ILogicalExpression commonSubExpr) {
+ ILogicalExpression conditionExpr = conditionExprRef.getValue();
+ if (conditionExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return null;
+ }
+ if (isEqJoinCondition(commonSubExpr)) {
+ // Do not eliminate the common expression if we could use it for an equi-join.
+ return null;
+ }
+ AbstractFunctionCallExpression conditionFuncExpr = (AbstractFunctionCallExpression) conditionExpr;
+ // Boolean expression that encloses the common subexpression.
+ ILogicalExpression enclosingBoolExpr = null;
+ // We are not dealing with arbitrarily nested and/or expressions here.
+ FunctionIdentifier funcIdent = conditionFuncExpr.getFunctionIdentifier();
+ if (funcIdent.equals(AlgebricksBuiltinFunctions.AND) || funcIdent.equals(AlgebricksBuiltinFunctions.OR)) {
+ Iterator<Mutable<ILogicalExpression>> argIter = conditionFuncExpr.getArguments().iterator();
+ while (argIter.hasNext()) {
+ Mutable<ILogicalExpression> argRef = argIter.next();
+ if (containsExpr(argRef.getValue(), commonSubExpr)) {
+ enclosingBoolExpr = argRef.getValue();
+ // Remove the enclosing expression from the argument list.
+ // We are going to pull it out into a new select operator.
+ argIter.remove();
+ break;
+ }
+ }
+ // If and/or only has a single argument left, pull it out and remove the and/or function.
+ if (conditionFuncExpr.getArguments().size() == 1) {
+ conditionExprRef.setValue(conditionFuncExpr.getArguments().get(0).getValue());
+ }
+ } else {
+ if (!containsExpr(conditionExprRef.getValue(), commonSubExpr)) {
+ return null;
+ }
+ enclosingBoolExpr = conditionFuncExpr;
+ // Replace the enclosing expression with TRUE.
+ conditionExprRef.setValue(ConstantExpression.TRUE);
+ }
+ return enclosingBoolExpr;
+ }
+ }
+
+ private boolean containsExpr(ILogicalExpression expr, ILogicalExpression searchExpr) {
+ if (expr == searchExpr) {
+ return true;
+ }
+ if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return false;
+ }
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ for (Mutable<ILogicalExpression> argRef : funcExpr.getArguments()) {
+ if (containsExpr(argRef.getValue(), searchExpr)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isEqJoinCondition(ILogicalExpression expr) {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ if (funcExpr.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.EQ)) {
+ ILogicalExpression arg1 = funcExpr.getArguments().get(0).getValue();
+ ILogicalExpression arg2 = funcExpr.getArguments().get(1).getValue();
+ if (arg1.getExpressionTag() == LogicalExpressionTag.VARIABLE
+ && arg2.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private final class ExprEquivalenceClass {
+ // First operator in which expression is used.
+ private final ILogicalOperator firstOp;
+
+ // Reference to expression in first op.
+ private final Mutable<ILogicalExpression> firstExprRef;
+
+ // Variable that this expression has been assigned to.
+ private LogicalVariable var;
+
+ public ExprEquivalenceClass(ILogicalOperator firstOp, Mutable<ILogicalExpression> firstExprRef) {
+ this.firstOp = firstOp;
+ this.firstExprRef = firstExprRef;
+ }
+
+ public ILogicalOperator getFirstOperator() {
+ return firstOp;
+ }
+
+ public Mutable<ILogicalExpression> getFirstExpression() {
+ return firstExprRef;
+ }
+
+ public void setVariable(LogicalVariable var) {
+ this.var = var;
+ }
+
+ public LogicalVariable getVariable() {
+ return var;
+ }
+
+ public boolean variableIsSet() {
+ return var != null;
+ }
+ }
+}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
new file mode 100644
index 0000000..df8ddda
--- /dev/null
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineSingleReferenceVariablesRule.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+
+/**
+ * Inlines variables that are referenced exactly once.
+ *
+ * Preconditions/Assumptions:
+ * Assumes no projects are in the plan.
+ *
+ * Example assuming variable $$3 is referenced exactly once.
+ *
+ * Before plan:
+ * select (funcA($$3))
+ * ...
+ * assign [$$3] <- [field-access($$0, 1)]
+ *
+ * After plan:
+ * select (funcA(field-access($$0, 1))
+ * ...
+ * assign [] <- []
+ */
+public class InlineSingleReferenceVariablesRule extends InlineVariablesRule {
+
+ // Maps from variable to a list of operators using that variable.
+ protected Map<LogicalVariable, List<ILogicalOperator>> usedVarsMap = new HashMap<LogicalVariable, List<ILogicalOperator>>();
+ protected List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+
+ @Override
+ protected void prepare(IOptimizationContext context) {
+ super.prepare(context);
+ usedVarsMap.clear();
+ usedVars.clear();
+ }
+
+ @Override
+ protected boolean performFinalAction() throws AlgebricksException {
+ boolean modified = false;
+ for (Map.Entry<LogicalVariable, List<ILogicalOperator>> entry : usedVarsMap.entrySet()) {
+ // Perform replacement only if variable is referenced a single time.
+ if (entry.getValue().size() == 1) {
+ ILogicalOperator op = entry.getValue().get(0);
+ if (!op.requiresVariableReferenceExpressions()) {
+ inlineVisitor.setOperator(op);
+ inlineVisitor.setTargetVariable(entry.getKey());
+ if (op.acceptExpressionTransform(inlineVisitor)) {
+ modified = true;
+ }
+ inlineVisitor.setTargetVariable(null);
+ }
+ }
+ }
+ return modified;
+ }
+
+ @Override
+ protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
+ usedVars.clear();
+ VariableUtilities.getUsedVariables(op, usedVars);
+ for (LogicalVariable var : usedVars) {
+ List<ILogicalOperator> opsUsingVar = usedVarsMap.get(var);
+ if (opsUsingVar == null) {
+ opsUsingVar = new ArrayList<ILogicalOperator>();
+ usedVarsMap.put(var, opsUsingVar);
+ }
+ opsUsingVar.add(op);
+ }
+ return false;
+ }
+}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 8a79d81..7fed577a 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -15,330 +15,231 @@
package edu.uci.ics.hyracks.algebricks.rewriter.rules;
import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+/**
+ * Replaces variable reference expressions with their assigned function-call expression where applicable
+ * (some variables are generated by datasources).
+ * Inlining variables may enable other optimizations by allowing selects and assigns to be moved
+ * (e.g., a select may be pushed into a join to enable an efficient physical join operator).
+ *
+ * Preconditions/Assumptions:
+ * Assumes no projects are in the plan. Only inlines variables whose assigned expression is a function call
+ * (i.e., this rule ignores right-hand side constants and other variable references expressions
+ *
+ * Postconditions/Examples:
+ * All qualifying variables have been inlined.
+ *
+ * Example (simplified):
+ *
+ * Before plan:
+ * select <- [$$1 < $$2 + $$0]
+ * assign [$$2] <- [funcZ() + $$0]
+ * assign [$$0, $$1] <- [funcX(), funcY()]
+ *
+ * After plan:
+ * select <- [funcY() < funcZ() + funcX() + funcX()]
+ * assign [$$2] <- [funcZ() + funcX()]
+ * assign [$$0, $$1] <- [funcX(), funcY()]
+ */
public class InlineVariablesRule implements IAlgebraicRewriteRule {
+ // Map of variables that could be replaced by their producing expression.
+ // Populated during the top-down sweep of the plan.
+ protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<LogicalVariable, ILogicalExpression>();
+
+ // Visitor for replacing variable reference expressions with their originating expression.
+ protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
+
+ // Set of FunctionIdentifiers that we should not inline.
+ protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<FunctionIdentifier>();
+
+ protected boolean hasRun = false;
+
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
return false;
}
@Override
- /**
- *
- * Does one big DFS sweep over the plan.
- *
- */
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ if (hasRun) {
+ return false;
+ }
if (context.checkIfInDontApplySet(this, opRef.getValue())) {
return false;
}
- VariableSubstitutionVisitor substVisitor = new VariableSubstitutionVisitor(false);
- VariableSubstitutionVisitor substVisitorForWrites = new VariableSubstitutionVisitor(true);
- substVisitor.setContext(context);
- substVisitorForWrites.setContext(context);
- Pair<Boolean, Boolean> bb = collectEqClassesAndRemoveRedundantOps(opRef, context, true,
- new LinkedList<EquivalenceClass>(), substVisitor, substVisitorForWrites);
- return bb.first;
+ prepare(context);
+ boolean modified = inlineVariables(opRef, context);
+ if (performFinalAction()) {
+ modified = true;
+ }
+ hasRun = true;
+ return modified;
+ }
+
+ protected void prepare(IOptimizationContext context) {
+ varAssignRhs.clear();
+ inlineVisitor.setContext(context);
+ }
+
+ protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
+ // Only inline variables in operators that can deal with arbitrary expressions.
+ if (!op.requiresVariableReferenceExpressions()) {
+ inlineVisitor.setOperator(op);
+ return op.acceptExpressionTransform(inlineVisitor);
+ }
+ return false;
}
- private Pair<Boolean, Boolean> collectEqClassesAndRemoveRedundantOps(Mutable<ILogicalOperator> opRef,
- IOptimizationContext context, boolean first, List<EquivalenceClass> equivClasses,
- VariableSubstitutionVisitor substVisitor, VariableSubstitutionVisitor substVisitorForWrites)
+ protected boolean performFinalAction() throws AlgebricksException {
+ return false;
+ }
+
+ protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
- // if (context.checkIfInDontApplySet(this, opRef.getValue())) {
- // return false;
- // }
- if (op.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
- return new Pair<Boolean, Boolean>(false, false);
- }
- boolean modified = false;
- boolean ecChange = false;
- int cnt = 0;
- for (Mutable<ILogicalOperator> i : op.getInputs()) {
- boolean isOuterInputBranch = op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN && cnt == 1;
- List<EquivalenceClass> eqc = isOuterInputBranch ? new LinkedList<EquivalenceClass>() : equivClasses;
-
- Pair<Boolean, Boolean> bb = (collectEqClassesAndRemoveRedundantOps(i, context, false, eqc, substVisitor,
- substVisitorForWrites));
-
- if (bb.first) {
- modified = true;
- }
- if (bb.second) {
- ecChange = true;
- }
-
- if (isOuterInputBranch) {
- if (AlgebricksConfig.DEBUG) {
- AlgebricksConfig.ALGEBRICKS_LOGGER.finest("--- Equivalence classes for inner branch of outer op.: "
- + eqc + "\n");
- }
- for (EquivalenceClass ec : eqc) {
- if (!ec.representativeIsConst()) {
- equivClasses.add(ec);
- }
- }
- }
-
- ++cnt;
- }
- if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans n = (AbstractOperatorWithNestedPlans) op;
- List<EquivalenceClass> eqc = equivClasses;
- if (n.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
- eqc = new LinkedList<EquivalenceClass>();
- } else {
- eqc = equivClasses;
- }
- for (ILogicalPlan p : n.getNestedPlans()) {
- for (Mutable<ILogicalOperator> r : p.getRoots()) {
- Pair<Boolean, Boolean> bb = collectEqClassesAndRemoveRedundantOps(r, context, false, eqc,
- substVisitor, substVisitorForWrites);
- if (bb.first) {
- modified = true;
- }
- if (bb.second) {
- ecChange = true;
- }
- }
- }
- }
- // we assume a variable is assigned a value only once
+
+ // Update mapping from variables to expressions during top-down traversal.
if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
- AssignOperator a = (AssignOperator) op;
- ILogicalExpression rhs = a.getExpressions().get(0).getValue();
- if (rhs.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- LogicalVariable varLeft = a.getVariables().get(0);
- VariableReferenceExpression varRef = (VariableReferenceExpression) rhs;
- LogicalVariable varRight = varRef.getVariableReference();
-
- EquivalenceClass ecRight = findEquivClass(varRight, equivClasses);
- if (ecRight != null) {
- ecRight.addMember(varLeft);
- } else {
- List<LogicalVariable> m = new LinkedList<LogicalVariable>();
- m.add(varRight);
- m.add(varLeft);
- EquivalenceClass ec = new EquivalenceClass(m, varRight);
- equivClasses.add(ec);
- if (AlgebricksConfig.DEBUG) {
- AlgebricksConfig.ALGEBRICKS_LOGGER.finest("--- New equivalence class: " + ec + "\n");
+ AssignOperator assignOp = (AssignOperator) op;
+ List<LogicalVariable> vars = assignOp.getVariables();
+ List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();
+ for (int i = 0; i < vars.size(); i++) {
+ ILogicalExpression expr = exprs.get(i).getValue();
+ // Ignore functions that are in the doNotInline set.
+ if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier())) {
+ continue;
}
}
- ecChange = true;
- } else if (((AbstractLogicalExpression) rhs).getExpressionTag() == LogicalExpressionTag.CONSTANT) {
- LogicalVariable varLeft = a.getVariables().get(0);
- List<LogicalVariable> m = new LinkedList<LogicalVariable>();
- m.add(varLeft);
- EquivalenceClass ec = new EquivalenceClass(m, (ConstantExpression) rhs);
- // equivClassesForParent.add(ec);
- equivClasses.add(ec);
- ecChange = true;
- }
- } else if (op.getOperatorTag() == LogicalOperatorTag.GROUP && !(context.checkIfInDontApplySet(this, op))) {
- GroupByOperator group = (GroupByOperator) op;
- Pair<Boolean, Boolean> r1 = processVarExprPairs(group.getGroupByList(), equivClasses);
- Pair<Boolean, Boolean> r2 = processVarExprPairs(group.getDecorList(), equivClasses);
- modified = modified || r1.first || r2.first;
- ecChange = r1.second || r2.second;
- }
- if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
- assignVarsNeededByProject((ProjectOperator) op, equivClasses, context);
- } else {
- if (op.getOperatorTag() == LogicalOperatorTag.WRITE) {
- substVisitorForWrites.setEquivalenceClasses(equivClasses);
- if (op.acceptExpressionTransform(substVisitorForWrites)) {
- modified = true;
- }
- } else {
- substVisitor.setEquivalenceClasses(equivClasses);
- if (op.acceptExpressionTransform(substVisitor)) {
- modified = true;
- if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
- GroupByOperator group = (GroupByOperator) op;
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gp : group.getGroupByList()) {
- if (gp.first != null
- && gp.second.getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- LogicalVariable gv = ((VariableReferenceExpression) gp.second.getValue())
- .getVariableReference();
- Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = group
- .getDecorList().iterator();
- while (iter.hasNext()) {
- Pair<LogicalVariable, Mutable<ILogicalExpression>> dp = iter.next();
- if (dp.first == null
- && dp.second.getValue().getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- LogicalVariable dv = ((VariableReferenceExpression) dp.second.getValue())
- .getVariableReference();
- if (dv == gv) {
- // The decor variable is redundant,
- // since it is
- // propagated as a grouping
- // variable.
- EquivalenceClass ec1 = findEquivClass(gv, equivClasses);
- if (ec1 != null) {
- ec1.addMember(gp.first);
- ec1.setVariableRepresentative(gp.first);
- } else {
- List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
- varList.add(gp.first);
- varList.add(gv);
- ec1 = new EquivalenceClass(varList, gp.first);
- }
- iter.remove();
- break;
- }
- }
- }
- }
- }
- }
- }
+ varAssignRhs.put(vars.get(i), exprs.get(i).getValue());
}
}
- return new Pair<Boolean, Boolean>(modified, ecChange);
- }
- private Pair<Boolean, Boolean> processVarExprPairs(
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairs, List<EquivalenceClass> equivClasses) {
- boolean ecFromGroup = false;
+ // Descend into children removing projects on the way.
boolean modified = false;
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : vePairs) {
- ILogicalExpression expr = p.second.getValue();
- if (p.first != null && expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
- VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
- LogicalVariable rhsVar = varRef.getVariableReference();
- ecFromGroup = true;
- EquivalenceClass ecRight = findEquivClass(rhsVar, equivClasses);
- if (ecRight != null) {
- LogicalVariable replacingVar = ecRight.getVariableRepresentative();
- if (replacingVar != null && replacingVar != rhsVar) {
- varRef.setVariable(replacingVar);
- modified = true;
- }
- }
- }
+ for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+ if (inlineVariables(inputOpRef, context)) {
+ modified = true;
+ }
}
- return new Pair<Boolean, Boolean>(modified, ecFromGroup);
+
+ if (performBottomUpAction(op)) {
+ modified = true;
+ }
+
+ if (modified) {
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ context.addToDontApplySet(this, op);
+ // Re-enable rules that we may have already tried. They could be applicable now after inlining.
+ context.removeFromAlreadyCompared(opRef.getValue());
+ }
+
+ return modified;
}
- // Instead of doing this, we could make Projection to be more expressive and
- // also take constants (or even expression), at the expense of a more
- // complex project push down.
- private void assignVarsNeededByProject(ProjectOperator op, List<EquivalenceClass> equivClasses,
- IOptimizationContext context) throws AlgebricksException {
- List<LogicalVariable> prVars = op.getVariables();
- int sz = prVars.size();
- for (int i = 0; i < sz; i++) {
- EquivalenceClass ec = findEquivClass(prVars.get(i), equivClasses);
- if (ec != null) {
- if (!ec.representativeIsConst()) {
- prVars.set(i, ec.getVariableRepresentative());
- }
- }
- }
- }
-
- private final static EquivalenceClass findEquivClass(LogicalVariable var, List<EquivalenceClass> equivClasses) {
- for (EquivalenceClass ec : equivClasses) {
- if (ec.contains(var)) {
- return ec;
- }
- }
- return null;
- }
-
- private class VariableSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
- private List<EquivalenceClass> equivClasses;
+ protected class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
+
+ private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
+ private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+ private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();
+ private ILogicalOperator op;
private IOptimizationContext context;
- private final boolean doNotSubstWithConst;
-
- public VariableSubstitutionVisitor(boolean doNotSubstWithConst) {
- this.doNotSubstWithConst = doNotSubstWithConst;
+ // If set, only replace this variable reference.
+ private LogicalVariable targetVar;
+
+ public InlineVariablesVisitor(Map<LogicalVariable, ILogicalExpression> varAssignRhs) {
+ this.varAssignRhs = varAssignRhs;
}
-
+
+ public void setTargetVariable(LogicalVariable targetVar) {
+ this.targetVar = targetVar;
+ }
+
public void setContext(IOptimizationContext context) {
this.context = context;
}
- public void setEquivalenceClasses(List<EquivalenceClass> equivClasses) {
- this.equivClasses = equivClasses;
+ public void setOperator(ILogicalOperator op) throws AlgebricksException {
+ this.op = op;
+ liveVars.clear();
}
-
+
@Override
- public boolean transform(Mutable<ILogicalExpression> exprRef) {
+ public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
ILogicalExpression e = exprRef.getValue();
switch (((AbstractLogicalExpression) e).getExpressionTag()) {
case VARIABLE: {
- // look for a required substitution
LogicalVariable var = ((VariableReferenceExpression) e).getVariableReference();
+ // Restrict replacement to targetVar if it has been set.
+ if (targetVar != null && var != targetVar) {
+ return false;
+ }
+ // Make sure has not been excluded from inlining.
if (context.shouldNotBeInlined(var)) {
return false;
}
- EquivalenceClass ec = findEquivClass(var, equivClasses);
- if (ec == null) {
+ ILogicalExpression rhs = varAssignRhs.get(var);
+ if (rhs == null) {
+ // Variable was not produced by an assign.
return false;
}
- if (ec.representativeIsConst()) {
- if (doNotSubstWithConst) {
- return false;
- }
- exprRef.setValue(ec.getConstRepresentative());
- return true;
- } else {
- LogicalVariable r = ec.getVariableRepresentative();
- if (!r.equals(var)) {
- exprRef.setValue(new VariableReferenceExpression(r));
- return true;
- } else {
+
+ // Make sure used variables from rhs are live.
+ if (liveVars.isEmpty()) {
+ VariableUtilities.getLiveVariables(op, liveVars);
+ }
+ rhsUsedVars.clear();
+ rhs.getUsedVariables(rhsUsedVars);
+ for (LogicalVariable rhsUsedVar : rhsUsedVars) {
+ if (!liveVars.contains(rhsUsedVar)) {
return false;
}
}
+
+ // Replace variable reference with a clone of the rhs expr.
+ exprRef.setValue(rhs.cloneExpression());
+ return true;
}
case FUNCTION_CALL: {
AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) e;
- boolean m = false;
+ boolean modified = false;
for (Mutable<ILogicalExpression> arg : fce.getArguments()) {
if (transform(arg)) {
- m = true;
+ modified = true;
}
}
- return m;
+ return modified;
}
default: {
return false;
}
}
}
-
}
}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
index d54833e..431fca1 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InsertProjectBeforeUnionRule.java
@@ -87,6 +87,7 @@
projectOp.getInputs().add(new MutableObject<ILogicalOperator>(parentOp));
opUnion.getInputs().get(branch).setValue(projectOp);
projectOp.setPhysicalOperator(new StreamProjectPOperator());
+ context.computeAndSetTypeEnvironmentForOperator(projectOp);
context.computeAndSetTypeEnvironmentForOperator(parentOp);
}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
index 5c5fdb1..a8864fe 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceGroupByCombinerRule.java
@@ -38,7 +38,6 @@
if (context.checkIfInDontApplySet(this, op)) {
return false;
}
- context.addToDontApplySet(this, op);
if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
return false;
}
@@ -86,6 +85,7 @@
opRef3.setValue(newGbyOp);
typeGby(newGbyOp, context);
typeGby(gbyOp, context);
+ context.addToDontApplySet(this, op);
return true;
}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
new file mode 100644
index 0000000..a057f4f
--- /dev/null
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Projects away unused variables at the earliest possible point.
+ * Does a full DFS sweep of the plan adding ProjectOperators in the bottom-up pass.
+ * Also, removes projects that have become useless.
+ * TODO: This rule 'recklessly' adds as many projects as possible, but there is no guarantee
+ * that the overall cost of the plan is reduced since project operators also add a cost.
+ */
+public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
+
+ private final Set<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
+ private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
+ private final Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
+ private final List<LogicalVariable> projectVars = new ArrayList<LogicalVariable>();
+ protected boolean hasRun = false;
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ if (hasRun) {
+ return false;
+ }
+ hasRun = true;
+ return introduceProjects(null, -1, opRef, Collections.<LogicalVariable> emptySet(), context);
+ }
+
+ protected boolean introduceProjects(AbstractLogicalOperator parentOp, int parentInputIndex,
+ Mutable<ILogicalOperator> opRef, Set<LogicalVariable> parentUsedVars, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ boolean modified = false;
+
+ usedVars.clear();
+ VariableUtilities.getUsedVariables(op, usedVars);
+
+ // In the top-down pass, maintain a set of variables that are used in op and all its parents.
+ HashSet<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
+ parentsUsedVars.addAll(parentUsedVars);
+ parentsUsedVars.addAll(usedVars);
+
+ // Descend into children.
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(i);
+ if (introduceProjects(op, i, inputOpRef, parentsUsedVars, context)) {
+ modified = true;
+ }
+ }
+
+ if (modified) {
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ }
+ // In the bottom-up pass, determine which live variables are not used by op's parents.
+ // Such variables are be projected away.
+ liveVars.clear();
+ VariableUtilities.getLiveVariables(op, liveVars);
+ producedVars.clear();
+ VariableUtilities.getProducedVariables(op, producedVars);
+ liveVars.removeAll(producedVars);
+
+ projectVars.clear();
+ for (LogicalVariable liveVar : liveVars) {
+ if (parentsUsedVars.contains(liveVar)) {
+ projectVars.add(liveVar);
+ }
+ }
+
+ // Some of the variables that are live at this op are not used above.
+ if (projectVars.size() != liveVars.size()) {
+ // Add a project operator under each of op's qualifying input branches.
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ ILogicalOperator childOp = op.getInputs().get(i).getValue();
+ liveVars.clear();
+ VariableUtilities.getLiveVariables(childOp, liveVars);
+ List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+ vars.addAll(projectVars);
+ // Only retain those variables that are live in the i-th input branch.
+ vars.retainAll(liveVars);
+ if (vars.size() != liveVars.size()) {
+ ProjectOperator projectOp = new ProjectOperator(vars);
+ projectOp.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
+ op.getInputs().get(i).setValue(projectOp);
+ context.computeAndSetTypeEnvironmentForOperator(projectOp);
+ modified = true;
+ }
+ }
+ } else if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+ // Check if the existing project has become useless.
+ liveVars.clear();
+ VariableUtilities.getLiveVariables(op.getInputs().get(0).getValue(), liveVars);
+ ProjectOperator projectOp = (ProjectOperator) op;
+ List<LogicalVariable> projectVars = projectOp.getVariables();
+ if (liveVars.size() == projectVars.size() && liveVars.containsAll(projectVars)) {
+ boolean eliminateProject = true;
+ // For UnionAll the variables must also be in exactly the correct order.
+ if (parentOp.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+ eliminateProject = canEliminateProjectBelowUnion((UnionAllOperator) parentOp, projectOp,
+ parentInputIndex);
+ }
+ if (eliminateProject) {
+ // The existing project has become useless. Remove it.
+ parentOp.getInputs().get(parentInputIndex).setValue(op.getInputs().get(0).getValue());
+ }
+ }
+ }
+
+ if (modified) {
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ }
+ return modified;
+ }
+
+ private boolean canEliminateProjectBelowUnion(UnionAllOperator unionOp, ProjectOperator projectOp,
+ int unionInputIndex) throws AlgebricksException {
+ List<LogicalVariable> orderedLiveVars = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(projectOp.getInputs().get(0).getValue(), orderedLiveVars);
+ int numVars = orderedLiveVars.size();
+ for (int i = 0; i < numVars; i++) {
+ Triple<LogicalVariable, LogicalVariable, LogicalVariable> varTriple = unionOp.getVariableMappings().get(i);
+ if (unionInputIndex == 0) {
+ if (varTriple.first != orderedLiveVars.get(i)) {
+ return false;
+ }
+ } else {
+ if (varTriple.second != orderedLiveVars.get(i)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PullSelectOutOfEqJoin.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PullSelectOutOfEqJoin.java
index 75862cf..8b4f0a1 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PullSelectOutOfEqJoin.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PullSelectOutOfEqJoin.java
@@ -38,9 +38,6 @@
public class PullSelectOutOfEqJoin implements IAlgebraicRewriteRule {
- private List<Mutable<ILogicalExpression>> eqVarVarComps = new ArrayList<Mutable<ILogicalExpression>>();
- private List<Mutable<ILogicalExpression>> otherPredicates = new ArrayList<Mutable<ILogicalExpression>>();
-
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
return false;
@@ -66,8 +63,8 @@
if (!fi.equals(AlgebricksBuiltinFunctions.AND)) {
return false;
}
- eqVarVarComps.clear();
- otherPredicates.clear();
+ List<Mutable<ILogicalExpression>> eqVarVarComps = new ArrayList<Mutable<ILogicalExpression>>();
+ List<Mutable<ILogicalExpression>> otherPredicates = new ArrayList<Mutable<ILogicalExpression>>();
for (Mutable<ILogicalExpression> arg : fexp.getArguments()) {
if (isEqVarVar(arg.getValue())) {
eqVarVarComps.add(arg);
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
new file mode 100644
index 0000000..1bcf95a
--- /dev/null
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
@@ -0,0 +1,149 @@
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pushes an AssignOperator below a UnionAll operator by creating an new AssignOperator below each of
+ * the UnionAllOperator's branches with appropriate variable replacements.
+ * This rule can help to enable other rules that are difficult to fire across a UnionAllOperator,
+ * for example, eliminating common sub-expressions.
+ *
+ * Example:
+ *
+ * Before plan:
+ * ...
+ * assign [$$20, $$21] <- [funcA($$3), funcB($$6)]
+ * union ($$1, $$2, $$3) ($$4, $$5, $$6)
+ * union_branch_0
+ * ...
+ * union_branch_1
+ * ...
+ *
+ * After plan:
+ * ...
+ * union ($$1, $$2, $$3) ($$4, $$5, $$6) ($$22, $$24, $$20) ($$23, $$25, $$21)
+ * assign [$$22, $$23] <- [funcA($$1), funcB($$4)]
+ * union_branch_0
+ * ...
+ * assign [$$24, $$25] <- [funcA($$2), funcB($$5)]
+ * union_branch_1
+ * ...
+ */
+public class PushAssignBelowUnionAllRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (!op.hasInputs()) {
+ return false;
+ }
+
+ boolean modified = false;
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+ if (childOp.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+ continue;
+ }
+ AssignOperator assignOp = (AssignOperator) childOp;
+
+ AbstractLogicalOperator childOfChildOp = (AbstractLogicalOperator) assignOp.getInputs().get(0).getValue();
+ if (childOfChildOp.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
+ continue;
+ }
+ UnionAllOperator unionOp = (UnionAllOperator) childOfChildOp;
+
+ Set<LogicalVariable> assignUsedVars = new HashSet<LogicalVariable>();
+ VariableUtilities.getUsedVariables(assignOp, assignUsedVars);
+
+ List<LogicalVariable> assignVars = assignOp.getVariables();
+
+ AssignOperator[] newAssignOps = new AssignOperator[2];
+ for (int j = 0; j < unionOp.getInputs().size(); j++) {
+ newAssignOps[j] = createAssignBelowUnionAllBranch(unionOp, j, assignOp, assignUsedVars, context);
+ }
+ // Add original assign variables to the union variable mappings.
+ for (int j = 0; j < assignVars.size(); j++) {
+ LogicalVariable first = newAssignOps[0].getVariables().get(j);
+ LogicalVariable second = newAssignOps[1].getVariables().get(j);
+ Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = new Triple<LogicalVariable, LogicalVariable, LogicalVariable>(
+ first, second, assignVars.get(j));
+ unionOp.getVariableMappings().add(varMapping);
+ }
+ context.computeAndSetTypeEnvironmentForOperator(unionOp);
+
+ // Remove original assign operator.
+ op.getInputs().set(i, assignOp.getInputs().get(0));
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ modified = true;
+ }
+
+ return modified;
+ }
+
+ private AssignOperator createAssignBelowUnionAllBranch(UnionAllOperator unionOp, int inputIndex,
+ AssignOperator originalAssignOp, Set<LogicalVariable> assignUsedVars, IOptimizationContext context)
+ throws AlgebricksException {
+ AssignOperator newAssignOp = cloneAssignOperator(originalAssignOp, context);
+ newAssignOp.getInputs()
+ .add(new MutableObject<ILogicalOperator>(unionOp.getInputs().get(inputIndex).getValue()));
+ context.computeAndSetTypeEnvironmentForOperator(newAssignOp);
+ unionOp.getInputs().get(inputIndex).setValue(newAssignOp);
+ int numVarMappings = unionOp.getVariableMappings().size();
+ for (int i = 0; i < numVarMappings; i++) {
+ Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = unionOp.getVariableMappings().get(i);
+ if (assignUsedVars.contains(varMapping.third)) {
+ LogicalVariable replacementVar;
+ if (inputIndex == 0) {
+ replacementVar = varMapping.first;
+ } else {
+ replacementVar = varMapping.second;
+ }
+ VariableUtilities.substituteVariables(newAssignOp, varMapping.third, replacementVar, context);
+ }
+ }
+ return newAssignOp;
+ }
+
+ /**
+ * Clones the given assign operator changing the returned variables to be new ones.
+ * Also, leaves the inputs of the clone clear.
+ */
+ private AssignOperator cloneAssignOperator(AssignOperator assignOp, IOptimizationContext context) {
+ List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+ List<Mutable<ILogicalExpression>> exprs = new ArrayList<Mutable<ILogicalExpression>>();
+ int numVars = assignOp.getVariables().size();
+ for (int i = 0; i < numVars; i++) {
+ vars.add(context.newVar());
+ exprs.add(new MutableObject<ILogicalExpression>(assignOp.getExpressions().get(i).getValue()
+ .cloneExpression()));
+ }
+ AssignOperator assignCloneOp = new AssignOperator(vars, exprs);
+ assignCloneOp.setExecutionMode(assignOp.getExecutionMode());
+ return assignCloneOp;
+ }
+}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java
new file mode 100644
index 0000000..16b010e
--- /dev/null
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushFunctionsBelowJoin.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Pushes function-call expressions below a join if possible.
+ * Assigns the result of such function-calls expressions to new variables, and replaces the original
+ * expression with a corresponding variable reference expression.
+ * This rule can help reduce the cost of computing expensive functions by pushing them below
+ * a join (which may blow up the cardinality).
+ * Also, this rule may help to enable other rules such as common subexpression elimination, again to reduce
+ * the number of calls to expensive functions.
+ *
+ * Example: (we are pushing pushMeFunc)
+ *
+ * Before plan:
+ * assign [$$10] <- [funcA(funcB(pushMeFunc($$3, $$4)))]
+ * join (some condition)
+ * join_branch_0 where $$3 and $$4 are not live
+ * ...
+ * join_branch_1 where $$3 and $$4 are live
+ * ...
+ *
+ * After plan:
+ * assign [$$10] <- [funcA(funcB($$11))]
+ * join (some condition)
+ * join_branch_0 where $$3 and $$4 are not live
+ * ...
+ * join_branch_1 where $$3 and $$4 are live
+ * assign[$$11] <- [pushMeFunc($$3, $$4)]
+ * ...
+ */
+public class PushFunctionsBelowJoin implements IAlgebraicRewriteRule {
+
+ private final Set<FunctionIdentifier> toPushFuncIdents;
+ private final List<Mutable<ILogicalExpression>> funcExprs = new ArrayList<Mutable<ILogicalExpression>>();
+ private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
+ private final List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+
+ public PushFunctionsBelowJoin(Set<FunctionIdentifier> toPushFuncIdents) {
+ this.toPushFuncIdents = toPushFuncIdents;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
+ return false;
+ }
+ AssignOperator assignOp = (AssignOperator) op;
+
+ // Find a join operator below this assign.
+ Mutable<ILogicalOperator> joinOpRef = findJoinOp(assignOp.getInputs().get(0));
+ if (joinOpRef == null) {
+ return false;
+ }
+ AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) joinOpRef.getValue();
+
+ // Check if the assign uses a function that we wish to push below the join if possible.
+ funcExprs.clear();
+ gatherFunctionCalls(assignOp, funcExprs);
+ if (funcExprs.isEmpty()) {
+ return false;
+ }
+
+ // Try to push the functions down the input branches of the join.
+ boolean modified = false;
+ if (pushDownFunctions(joinOp, 0, funcExprs, context)) {
+ modified = true;
+ }
+ if (pushDownFunctions(joinOp, 1, funcExprs, context)) {
+ modified = true;
+ }
+ if (modified) {
+ context.computeAndSetTypeEnvironmentForOperator(joinOp);
+ }
+ return modified;
+ }
+
+ private Mutable<ILogicalOperator> findJoinOp(Mutable<ILogicalOperator> opRef) {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ switch (op.getOperatorTag()) {
+ case INNERJOIN:
+ case LEFTOUTERJOIN: {
+ return opRef;
+ }
+ // Bail on these operators.
+ case GROUP:
+ case AGGREGATE:
+ case DISTINCT:
+ case UNNEST_MAP: {
+ return null;
+ }
+ // Traverse children.
+ default: {
+ for (Mutable<ILogicalOperator> childOpRef : op.getInputs()) {
+ return findJoinOp(childOpRef);
+ }
+ }
+ }
+ return null;
+ }
+
+ private void gatherFunctionCalls(AssignOperator assignOp, List<Mutable<ILogicalExpression>> funcExprs) {
+ for (Mutable<ILogicalExpression> exprRef : assignOp.getExpressions()) {
+ gatherFunctionCalls(exprRef, funcExprs);
+ }
+ }
+
+ private void gatherFunctionCalls(Mutable<ILogicalExpression> exprRef, List<Mutable<ILogicalExpression>> funcExprs) {
+ AbstractLogicalExpression expr = (AbstractLogicalExpression) exprRef.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return;
+ }
+ // Check whether the function is a function we want to push.
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ if (toPushFuncIdents.contains(funcExpr.getFunctionIdentifier())) {
+ funcExprs.add(exprRef);
+ }
+ // Traverse arguments.
+ for (Mutable<ILogicalExpression> funcArg : funcExpr.getArguments()) {
+ gatherFunctionCalls(funcArg, funcExprs);
+ }
+ }
+
+ private boolean pushDownFunctions(AbstractBinaryJoinOperator joinOp, int inputIndex,
+ List<Mutable<ILogicalExpression>> funcExprs, IOptimizationContext context) throws AlgebricksException {
+ ILogicalOperator joinInputOp = joinOp.getInputs().get(inputIndex).getValue();
+ liveVars.clear();
+ VariableUtilities.getLiveVariables(joinInputOp, liveVars);
+ Iterator<Mutable<ILogicalExpression>> funcIter = funcExprs.iterator();
+ List<LogicalVariable> assignVars = null;
+ List<Mutable<ILogicalExpression>> assignExprs = null;
+ while (funcIter.hasNext()) {
+ Mutable<ILogicalExpression> funcExprRef = funcIter.next();
+ ILogicalExpression funcExpr = funcExprRef.getValue();
+ usedVars.clear();
+ funcExpr.getUsedVariables(usedVars);
+ // Check if we can push the function down this branch.
+ if (liveVars.containsAll(usedVars)) {
+ if (assignVars == null) {
+ assignVars = new ArrayList<LogicalVariable>();
+ assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
+ }
+ // Replace the original expression with a variable reference expression.
+ LogicalVariable replacementVar = context.newVar();
+ assignVars.add(replacementVar);
+ assignExprs.add(new MutableObject<ILogicalExpression>(funcExpr));
+ funcExprRef.setValue(new VariableReferenceExpression(replacementVar));
+ funcIter.remove();
+ }
+ }
+ // Create new assign operator below the join if any functions can be pushed.
+ if (assignVars != null) {
+ AssignOperator newAssign = new AssignOperator(assignVars, assignExprs);
+ newAssign.getInputs().add(new MutableObject<ILogicalOperator>(joinInputOp));
+ newAssign.setExecutionMode(joinOp.getExecutionMode());
+ joinOp.getInputs().get(inputIndex).setValue(newAssign);
+ context.computeAndSetTypeEnvironmentForOperator(newAssign);
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
index 8c679c5..99a6b8c 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushSelectIntoJoinRule.java
@@ -143,11 +143,11 @@
if (!intersectsBranch[0] && !intersectsBranch[1]) {
return false;
}
+ if (needToPushOps) {
+ pushOps(pushedOnLeft, joinBranchLeftRef, context);
+ pushOps(pushedOnRight, joinBranchRightRef, context);
+ }
if (intersectsAllBranches) {
- if (needToPushOps) {
- pushOps(pushedOnLeft, joinBranchLeftRef, context);
- pushOps(pushedOnRight, joinBranchRightRef, context);
- }
addCondToJoin(select, join, context);
} else { // push down
Iterator<Mutable<ILogicalOperator>> branchIter = join.getInputs().iterator();
@@ -156,13 +156,6 @@
Mutable<ILogicalOperator> branch = branchIter.next();
boolean inter = intersectsBranch[j];
if (inter) {
- if (needToPushOps) {
- if (j == 0) {
- pushOps(pushedOnLeft, joinBranchLeftRef, context);
- } else {
- pushOps(pushedOnRight, joinBranchRightRef, context);
- }
- }
copySelectToBranch(select, branch, context);
}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
new file mode 100644
index 0000000..1106898
--- /dev/null
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantGroupByDecorVars.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Removes duplicate variables from a group-by operator's decor list.
+ */
+public class RemoveRedundantGroupByDecorVars implements IAlgebraicRewriteRule {
+
+ private final Set<LogicalVariable> vars = new HashSet<LogicalVariable>();
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+ return false;
+ }
+ if (context.checkIfInDontApplySet(this, op)) {
+ return false;
+ }
+ vars.clear();
+
+ boolean modified = false;
+ GroupByOperator groupOp = (GroupByOperator) op;
+ Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = groupOp.getDecorList().iterator();
+ while (iter.hasNext()) {
+ Pair<LogicalVariable, Mutable<ILogicalExpression>> decor = iter.next();
+ if (decor.first != null || decor.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ continue;
+ }
+ VariableReferenceExpression varRefExpr = (VariableReferenceExpression) decor.second.getValue();
+ LogicalVariable var = varRefExpr.getVariableReference();
+ if (vars.contains(var)) {
+ iter.remove();
+ modified = true;
+ } else {
+ vars.add(var);
+ }
+ }
+ if (modified) {
+ context.addToDontApplySet(this, op);
+ }
+ return modified;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
new file mode 100644
index 0000000..ec57be5
--- /dev/null
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * Replaces redundant variable references with their bottom-most equivalent representative.
+ * Does a DFS sweep over the plan keeping track of variable equivalence classes.
+ * For example, this rule would perform the following rewrite.
+ *
+ * Before Plan:
+ * select (function-call: func, Args:[%0->$$11])
+ * project [$11]
+ * assign [$$11] <- [$$10]
+ * assign [$$10] <- [$$9]
+ * assign [$$9] <- ...
+ * ...
+ *
+ * After Plan:
+ * select (function-call: func, Args:[%0->$$9])
+ * project [$9]
+ * assign [$$11] <- [$$9]
+ * assign [$$10] <- [$$9]
+ * assign [$$9] <- ...
+ * ...
+ */
+public class RemoveRedundantVariablesRule implements IAlgebraicRewriteRule {
+
+ private final VariableSubstitutionVisitor substVisitor = new VariableSubstitutionVisitor();
+ private final Map<LogicalVariable, List<LogicalVariable>> equivalentVarsMap = new HashMap<LogicalVariable, List<LogicalVariable>>();
+
+ protected boolean hasRun = false;
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ if (context.checkIfInDontApplySet(this, opRef.getValue())) {
+ return false;
+ }
+ equivalentVarsMap.clear();
+ boolean modified = removeRedundantVariables(opRef, context);
+ if (modified) {
+ context.computeAndSetTypeEnvironmentForOperator(opRef.getValue());
+ }
+ return modified;
+ }
+
+ private void updateEquivalenceClassMap(LogicalVariable lhs, LogicalVariable rhs) {
+ List<LogicalVariable> equivalentVars = equivalentVarsMap.get(rhs);
+ if (equivalentVars == null) {
+ equivalentVars = new ArrayList<LogicalVariable>();
+ // The first element in the list is the bottom-most representative which will replace all equivalent vars.
+ equivalentVars.add(rhs);
+ equivalentVars.add(lhs);
+ equivalentVarsMap.put(rhs, equivalentVars);
+ }
+ equivalentVarsMap.put(lhs, equivalentVars);
+ equivalentVars.get(0);
+ }
+
+ private boolean removeRedundantVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ boolean modified = false;
+
+ // Recurse into children.
+ for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+ if (removeRedundantVariables(inputOpRef, context)) {
+ modified = true;
+ }
+ }
+
+ // Update equivalence class map.
+ if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
+ AssignOperator assignOp = (AssignOperator) op;
+ int numVars = assignOp.getVariables().size();
+ for (int i = 0; i < numVars; i++) {
+ ILogicalExpression expr = assignOp.getExpressions().get(i).getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ continue;
+ }
+ VariableReferenceExpression rhsVarRefExpr = (VariableReferenceExpression) expr;
+ // Update equivalence class map.
+ LogicalVariable lhs = assignOp.getVariables().get(i);
+ LogicalVariable rhs = rhsVarRefExpr.getVariableReference();
+ updateEquivalenceClassMap(lhs, rhs);
+ }
+ }
+
+ // Replace variable references with their first representative.
+ if (op.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+ // The project operator does not use expressions, so we need to replace it's variables manually.
+ if (replaceProjectVars((ProjectOperator) op)) {
+ modified = true;
+ }
+ } else if(op.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+ // Replace redundant variables manually in the UnionAll operator.
+ if (replaceUnionAllVars((UnionAllOperator) op)) {
+ modified = true;
+ }
+ } else {
+ if (op.acceptExpressionTransform(substVisitor)) {
+ modified = true;
+ }
+ }
+
+ // Perform variable replacement in nested plans.
+ if (op.hasNestedPlans()) {
+ AbstractOperatorWithNestedPlans opWithNestedPlan = (AbstractOperatorWithNestedPlans) op;
+ for (ILogicalPlan nestedPlan : opWithNestedPlan.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> rootRef : nestedPlan.getRoots()) {
+ if (removeRedundantVariables(rootRef, context)) {
+ modified = true;
+ }
+ }
+ }
+ }
+
+ // Deal with re-mapping of variables in group by.
+ if (op.getOperatorTag() == LogicalOperatorTag.GROUP) {
+ if (handleGroupByVarRemapping((GroupByOperator) op)) {
+ modified = true;
+ }
+ }
+
+ if (modified) {
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ context.addToDontApplySet(this, op);
+ }
+ return modified;
+ }
+
+ private boolean handleGroupByVarRemapping(GroupByOperator groupOp) {
+ boolean modified = false;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> gp : groupOp.getGroupByList()) {
+ if (gp.first == null || gp.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ continue;
+ }
+ LogicalVariable groupByVar = ((VariableReferenceExpression) gp.second.getValue()).getVariableReference();
+ Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = groupOp.getDecorList().iterator();
+ while (iter.hasNext()) {
+ Pair<LogicalVariable, Mutable<ILogicalExpression>> dp = iter.next();
+ if (dp.first != null || dp.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ continue;
+ }
+ LogicalVariable dv = ((VariableReferenceExpression) dp.second.getValue()).getVariableReference();
+ if (dv == groupByVar) {
+ // The decor variable is redundant, since it is propagated as a grouping variable.
+ List<LogicalVariable> equivalentVars = equivalentVarsMap.get(groupByVar);
+ if (equivalentVars != null) {
+ // Change representative of this equivalence class.
+ equivalentVars.set(0, gp.first);
+ equivalentVarsMap.put(gp.first, equivalentVars);
+ } else {
+ updateEquivalenceClassMap(gp.first, groupByVar);
+ }
+ iter.remove();
+ modified = true;
+ break;
+ }
+ }
+ }
+ return modified;
+ }
+
+ /**
+ * Replace the projects's variables with their corresponding representative
+ * from the equivalence class map (if any).
+ * We cannot use the VariableSubstitutionVisitor here because the project ops
+ * maintain their variables as a list and not as expressions.
+ */
+ private boolean replaceProjectVars(ProjectOperator op) throws AlgebricksException {
+ List<LogicalVariable> vars = op.getVariables();
+ int size = vars.size();
+ boolean modified = false;
+ for (int i = 0; i < size; i++) {
+ LogicalVariable var = vars.get(i);
+ List<LogicalVariable> equivalentVars = equivalentVarsMap.get(var);
+ if (equivalentVars == null) {
+ continue;
+ }
+ // Replace with equivalence class representative.
+ LogicalVariable representative = equivalentVars.get(0);
+ if (representative != var) {
+ vars.set(i, equivalentVars.get(0));
+ modified = true;
+ }
+ }
+ return modified;
+ }
+
+ private boolean replaceUnionAllVars(UnionAllOperator op) throws AlgebricksException {
+ boolean modified = false;
+ for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping : op.getVariableMappings()) {
+ List<LogicalVariable> firstEquivalentVars = equivalentVarsMap.get(varMapping.first);
+ List<LogicalVariable> secondEquivalentVars = equivalentVarsMap.get(varMapping.second);
+ // Replace variables with their representative.
+ if (firstEquivalentVars != null) {
+ varMapping.first = firstEquivalentVars.get(0);
+ modified = true;
+ }
+ if (secondEquivalentVars != null) {
+ varMapping.second = secondEquivalentVars.get(0);
+ modified = true;
+ }
+ }
+ return modified;
+ }
+
+ private class VariableSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
+ @Override
+ public boolean transform(Mutable<ILogicalExpression> exprRef) {
+ ILogicalExpression e = exprRef.getValue();
+ switch (((AbstractLogicalExpression) e).getExpressionTag()) {
+ case VARIABLE: {
+ // Replace variable references with their equivalent representative in the equivalence class map.
+ VariableReferenceExpression varRefExpr = (VariableReferenceExpression) e;
+ LogicalVariable var = varRefExpr.getVariableReference();
+ List<LogicalVariable> equivalentVars = equivalentVarsMap.get(var);
+ if (equivalentVars == null) {
+ return false;
+ }
+ LogicalVariable representative = equivalentVars.get(0);
+ if (representative != var) {
+ varRefExpr.setVariable(representative);
+ return true;
+ }
+ return false;
+ }
+ case FUNCTION_CALL: {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) e;
+ boolean modified = false;
+ for (Mutable<ILogicalExpression> arg : funcExpr.getArguments()) {
+ if (transform(arg)) {
+ modified = true;
+ }
+ }
+ return modified;
+ }
+ default: {
+ return false;
+ }
+ }
+ }
+ }
+}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index c53ea0a..e0c2741 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -23,6 +23,7 @@
import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -33,10 +34,14 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+/**
+ * Removes unused variables from Assign, Unnest, Aggregate, and UnionAll operators.
+ */
public class RemoveUnusedAssignAndAggregateRule implements IAlgebraicRewriteRule {
@Override
@@ -55,7 +60,7 @@
if (smthToRemove) {
removeUnusedAssigns(opRef, toRemove, context);
}
- return smthToRemove;
+ return !toRemove.isEmpty();
}
private void removeUnusedAssigns(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> toRemove,
@@ -87,28 +92,59 @@
private int removeFromAssigns(AbstractLogicalOperator op, Set<LogicalVariable> toRemove,
IOptimizationContext context) throws AlgebricksException {
- if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
- AssignOperator assign = (AssignOperator) op;
- if (removeUnusedVarsAndExprs(toRemove, assign.getVariables(), assign.getExpressions())) {
- context.computeAndSetTypeEnvironmentForOperator(assign);
+ switch (op.getOperatorTag()) {
+ case ASSIGN: {
+ AssignOperator assign = (AssignOperator) op;
+ if (removeUnusedVarsAndExprs(toRemove, assign.getVariables(), assign.getExpressions())) {
+ context.computeAndSetTypeEnvironmentForOperator(assign);
+ }
+ return assign.getVariables().size();
}
- return assign.getVariables().size();
- } else if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
- AggregateOperator agg = (AggregateOperator) op;
- if (removeUnusedVarsAndExprs(toRemove, agg.getVariables(), agg.getExpressions())) {
- context.computeAndSetTypeEnvironmentForOperator(agg);
+ case AGGREGATE: {
+ AggregateOperator agg = (AggregateOperator) op;
+ if (removeUnusedVarsAndExprs(toRemove, agg.getVariables(), agg.getExpressions())) {
+ context.computeAndSetTypeEnvironmentForOperator(agg);
+ }
+ return agg.getVariables().size();
}
- return agg.getVariables().size();
- } else if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
- UnnestOperator uOp = (UnnestOperator) op;
- LogicalVariable pVar = uOp.getPositionalVariable();
- if (pVar != null && toRemove.contains(pVar)) {
- uOp.setPositionalVariable(null);
+ case UNNEST: {
+ UnnestOperator uOp = (UnnestOperator) op;
+ LogicalVariable pVar = uOp.getPositionalVariable();
+ if (pVar != null && toRemove.contains(pVar)) {
+ uOp.setPositionalVariable(null);
+ }
+ break;
+ }
+ case UNIONALL: {
+ UnionAllOperator unionOp = (UnionAllOperator) op;
+ if (removeUnusedVarsFromUnionAll(unionOp, toRemove)) {
+ context.computeAndSetTypeEnvironmentForOperator(unionOp);
+ }
+ return unionOp.getVariableMappings().size();
}
}
return -1;
}
+ private boolean removeUnusedVarsFromUnionAll(UnionAllOperator unionOp, Set<LogicalVariable> toRemove) {
+ Iterator<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> iter = unionOp.getVariableMappings()
+ .iterator();
+ boolean modified = false;
+ Set<LogicalVariable> removeFromRemoveSet = new HashSet<LogicalVariable>();
+ while (iter.hasNext()) {
+ Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping = iter.next();
+ if (toRemove.contains(varMapping.third)) {
+ iter.remove();
+ modified = true;
+ }
+ // In any case, make sure we do not removing these variables.
+ removeFromRemoveSet.add(varMapping.first);
+ removeFromRemoveSet.add(varMapping.second);
+ }
+ toRemove.removeAll(removeFromRemoveSet);
+ return modified;
+ }
+
private boolean removeUnusedVarsAndExprs(Set<LogicalVariable> toRemove, List<LogicalVariable> varList,
List<Mutable<ILogicalExpression>> exprList) {
boolean changed = false;
@@ -142,22 +178,41 @@
}
}
}
- if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
- AssignOperator assign = (AssignOperator) op;
- toRemove.addAll(assign.getVariables());
- } else if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
- AggregateOperator agg = (AggregateOperator) op;
- toRemove.addAll(agg.getVariables());
- } else if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
- UnnestOperator uOp = (UnnestOperator) op;
- LogicalVariable pVar = uOp.getPositionalVariable();
- if (pVar != null) {
- toRemove.add(pVar);
+ boolean removeUsedVars = true;
+ switch (op.getOperatorTag()) {
+ case ASSIGN: {
+ AssignOperator assign = (AssignOperator) op;
+ toRemove.addAll(assign.getVariables());
+ break;
+ }
+ case AGGREGATE: {
+ AggregateOperator agg = (AggregateOperator) op;
+ toRemove.addAll(agg.getVariables());
+ break;
+ }
+ case UNNEST: {
+ UnnestOperator uOp = (UnnestOperator) op;
+ LogicalVariable pVar = uOp.getPositionalVariable();
+ if (pVar != null) {
+ toRemove.add(pVar);
+ }
+ break;
+ }
+ case UNIONALL: {
+ UnionAllOperator unionOp = (UnionAllOperator) op;
+ for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> varMapping : unionOp
+ .getVariableMappings()) {
+ toRemove.add(varMapping.third);
+ }
+ removeUsedVars = false;
+ break;
}
}
- List<LogicalVariable> used = new LinkedList<LogicalVariable>();
- VariableUtilities.getUsedVariables(op, used);
- toRemove.removeAll(used);
+ if (removeUsedVars) {
+ List<LogicalVariable> used = new LinkedList<LogicalVariable>();
+ VariableUtilities.getUsedVariables(op, used);
+ toRemove.removeAll(used);
+ }
}
}
diff --git a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 38cf96e..60a4fbb 100644
--- a/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/fullstack/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
@@ -245,6 +246,10 @@
op.setPhysicalOperator(new SinkWritePOperator());
break;
}
+ case DISTRIBUTE_RESULT: {
+ op.setPhysicalOperator(new DistributeResultPOperator());
+ break;
+ }
case WRITE_RESULT: {
WriteResultOperator opLoad = (WriteResultOperator) op;
LogicalVariable payload;
@@ -267,8 +272,8 @@
List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
- op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
- opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+ op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys, opInsDel
+ .getFilterExpression(), opInsDel.getDataSourceIndex()));
break;
}
case SINK: {
diff --git a/fullstack/algebricks/algebricks-runtime/pom.xml b/fullstack/algebricks/algebricks-runtime/pom.xml
index e40dfb0..e438283 100644
--- a/fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/fullstack/algebricks/algebricks-runtime/pom.xml
@@ -16,8 +16,9 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
</configuration>
</plugin>
</plugins>
diff --git a/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
new file mode 100644
index 0000000..4f28c81
--- /dev/null
+++ b/fullstack/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.runtime.serializer;
+
+import java.io.PrintStream;
+import java.nio.BufferOverflowException;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IAWriter;
+import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IResultSerializerFactoryProvider;
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializer;
+import edu.uci.ics.hyracks.api.dataflow.value.IResultSerializerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ResultSerializerFactoryProvider implements IResultSerializerFactoryProvider {
+ private static final long serialVersionUID = 1L;
+
+ public static final ResultSerializerFactoryProvider INSTANCE = new ResultSerializerFactoryProvider();
+
+ private ResultSerializerFactoryProvider() {
+ }
+
+ @Override
+ public IResultSerializerFactory getAqlResultSerializerFactoryProvider(final int[] fields,
+ final IPrinterFactory[] printerFactories, final IAWriterFactory writerFactory) {
+ return new IResultSerializerFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IResultSerializer createResultSerializer(RecordDescriptor inputRecordDesc, PrintStream printStream) {
+ final IAWriter writer = writerFactory.createWriter(fields, printStream, printerFactories,
+ inputRecordDesc);
+
+ return new IResultSerializer() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void init() throws HyracksDataException {
+ try {
+ writer.init();
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public boolean appendTuple(IFrameTupleAccessor tAccess, int tIdx) throws HyracksDataException {
+ try {
+ writer.printTuple(tAccess, tIdx);
+ } catch (BufferOverflowException e) {
+ return false;
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ return true;
+ }
+ };
+ }
+ };
+ }
+}
diff --git a/fullstack/algebricks/algebricks-tests/pom.xml b/fullstack/algebricks/algebricks-tests/pom.xml
index 19e6711..c114881 100644
--- a/fullstack/algebricks/algebricks-tests/pom.xml
+++ b/fullstack/algebricks/algebricks-tests/pom.xml
@@ -16,9 +16,11 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
- </configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
+ <encoding>UTF-8</encoding>
+ </configuration>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
diff --git a/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java b/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
index b5c4b47..dad7cd0 100644
--- a/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
+++ b/fullstack/algebricks/algebricks-tests/src/test/java/edu/uci/ics/hyracks/algebricks/tests/util/AlgebricksHyracksIntegrationUtil.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -55,6 +55,7 @@
ncConfig1.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
ncConfig1.clusterNetIPAddress = "127.0.0.1";
ncConfig1.dataIPAddress = "127.0.0.1";
+ ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -64,6 +65,7 @@
ncConfig2.ccPort = TEST_HYRACKS_CC_CLUSTER_NET_PORT;
ncConfig2.clusterNetIPAddress = "127.0.0.1";
ncConfig2.dataIPAddress = "127.0.0.1";
+ ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -85,4 +87,4 @@
hcc.waitForCompletion(jobId);
}
-}
\ No newline at end of file
+}