Moved MaterializeOperator and NestedSubplanToJoinRule to Hyracks.
Change-Id: I74f62bc26706fc72c1baf05f27ce8cdf219cb778
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/168
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <westmann@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index bface33..9ddd927 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -20,31 +20,32 @@
DATASOURCESCAN,
DISTINCT,
DISTRIBUTE_RESULT,
- GROUP,
EMPTYTUPLESOURCE,
EXCHANGE,
+ EXTENSION_OPERATOR,
+ EXTERNAL_LOOKUP,
+ GROUP,
+ INDEX_INSERT_DELETE,
INNERJOIN,
+ INSERT_DELETE,
LEFTOUTERJOIN,
LIMIT,
+ MATERIALIZE,
NESTEDTUPLESOURCE,
ORDER,
- PROJECT,
PARTITIONINGSPLIT,
+ PROJECT,
REPLICATE,
RUNNINGAGGREGATE,
SCRIPT,
SELECT,
SINK,
SUBPLAN,
+ TOKENIZE,
UNIONALL,
UNNEST,
UNNEST_MAP,
+ UPDATE,
WRITE,
WRITE_RESULT,
- INSERT_DELETE,
- INDEX_INSERT_DELETE,
- UPDATE,
- EXTENSION_OPERATOR,
- EXTERNAL_LOOKUP,
- TOKENIZE
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 48f8230..911bfa1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -20,50 +20,51 @@
BROADCAST_EXCHANGE,
BTREE_SEARCH,
BULKLOAD,
- INDEX_BULKLOAD,
- STATS,
DATASOURCE_SCAN,
DISTRIBUTE_RESULT,
EMPTY_TUPLE_SOURCE,
+ EXTENSION_OPERATOR,
EXTERNAL_GROUP_BY,
- IN_MEMORY_HASH_JOIN,
+ EXTERNAL_LOOKUP,
HASH_GROUP_BY,
HASH_PARTITION_EXCHANGE,
HASH_PARTITION_MERGE_EXCHANGE,
- HYBRID_HASH_JOIN,
HDFS_READER,
+ HYBRID_HASH_JOIN,
+ IN_MEMORY_HASH_JOIN,
IN_MEMORY_STABLE_SORT,
+ INDEX_BULKLOAD,
+ INDEX_INSERT_DELETE,
+ INSERT_DELETE,
+ LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH,
+ MATERIALIZE,
MICRO_PRE_CLUSTERED_GROUP_BY,
NESTED_LOOP,
NESTED_TUPLE_SOURCE,
ONE_TO_ONE_EXCHANGE,
- PRE_SORTED_DISTINCT_BY,
+ PARTITIONINGSPLIT,
PRE_CLUSTERED_GROUP_BY,
- RANGE_PARTITION_EXCHANGE,
+ PRE_SORTED_DISTINCT_BY,
RANDOM_MERGE_EXCHANGE,
+ RANGE_PARTITION_EXCHANGE,
RTREE_SEARCH,
RUNNING_AGGREGATE,
- SORT_GROUP_BY,
- SORT_MERGE_EXCHANGE,
+ SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
SINK,
SINK_WRITE,
+ SORT_GROUP_BY,
+ SORT_MERGE_EXCHANGE,
SPLIT,
STABLE_SORT,
+ STATS,
STREAM_LIMIT,
- STREAM_SELECT,
STREAM_PROJECT,
+ STREAM_SELECT,
STRING_STREAM_SCRIPT,
SUBPLAN,
+ TOKENIZE,
UNION_ALL,
UNNEST,
- WRITE_RESULT,
- INSERT_DELETE,
- INDEX_INSERT_DELETE,
UPDATE,
- SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
- LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH,
- PARTITIONINGSPLIT,
- EXTENSION_OPERATOR,
- EXTERNAL_LOOKUP,
- TOKENIZE
+ WRITE_RESULT,
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/MaterializeOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/MaterializeOperator.java
new file mode 100644
index 0000000..5cecf07
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/MaterializeOperator.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class MaterializeOperator extends AbstractLogicalOperator {
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.MATERIALIZE;
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ return visitor.visitMaterializeOperator(this, arg);
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ return createPropagatingAllInputsTypeEnvironment(ctx);
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return VariablePropagationPolicy.ALL;
+ }
+
+ @Override
+ public boolean isMap() {
+ return false;
+ }
+
+ @Override
+ public void recomputeSchema() throws AlgebricksException {
+ schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+ }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 3f13977..2c5bd54 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -89,6 +89,10 @@
return this.outputArity = outputArity;
}
+ public void setOutputMaterializationFlags(boolean[] outputMaterializationFlags) {
+ this.outputMaterializationFlags = outputMaterializationFlags;
+ }
+
public boolean[] getOutputMaterializationFlags() {
return outputMaterializationFlags;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 266ead8..fcccee1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -57,6 +57,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -404,6 +405,12 @@
}
@Override
+ public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext ctx) throws AlgebricksException {
+ propagateFDsAndEquivClasses(op, ctx);
+ return null;
+ }
+
+ @Override
public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext ctx)
throws AlgebricksException {
ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 804c0e7..be83320 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -48,6 +48,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -272,6 +273,14 @@
}
@Override
+ public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException {
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+ if (aop.getOperatorTag() != LogicalOperatorTag.MATERIALIZE)
+ return Boolean.FALSE;
+ return Boolean.TRUE;
+ }
+
+ @Override
public Boolean visitScriptOperator(ScriptOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.SCRIPT)
@@ -706,6 +715,11 @@
}
@Override
+ public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ return new MaterializeOperator();
+ }
+
+ @Override
public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
ArrayList<LogicalVariable> newInputList = new ArrayList<LogicalVariable>();
ArrayList<LogicalVariable> newOutputList = new ArrayList<LogicalVariable>();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index ea8e82b..99df45c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -47,6 +47,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -170,6 +171,12 @@
}
@Override
+ public Void visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException {
+ mapVariablesStandard(op, arg);
+ return null;
+ }
+
+ @Override
public Void visitScriptOperator(ScriptOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 388d9f9..19fcf65 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -39,6 +39,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -187,6 +188,12 @@
}
@Override
+ public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext arg) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext context)
throws AlgebricksException {
visitAssignment(op, context);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index bf184e9..1fa97a6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -42,6 +42,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -235,6 +236,11 @@
}
@Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index b31bc3b..0b37b92 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -43,6 +43,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -271,6 +272,12 @@
}
@Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ standardLayout(op);
+ return null;
+ }
+
+ @Override
public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
standardLayout(op);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index b2d6022..227cd53 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -43,6 +43,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -377,6 +378,12 @@
}
@Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+ throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitInsertDeleteOperator(InsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 0209492..8796da4 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -42,6 +42,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -379,6 +380,11 @@
}
@Override
+ public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
op.getDelegate().getUsedVariables(usedVariables);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
new file mode 100644
index 0000000..27f4b25
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/MaterializePOperator.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+
+public class MaterializePOperator extends AbstractPhysicalOperator {
+
+ private final boolean isSingleActivity;
+
+ public MaterializePOperator(boolean isSingleActivity) {
+ this.isSingleActivity = isSingleActivity;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.MATERIALIZE;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+ propagatedSchema, context);
+ MaterializingOperatorDescriptor materializationOpDesc = new MaterializingOperatorDescriptor(
+ builder.getJobSpec(), recDescriptor, isSingleActivity);
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, materializationOpDesc);
+ ILogicalOperator src = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, op, 0);
+ }
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ int[] outputDependencyLabels;
+ if (isSingleActivity) {
+ outputDependencyLabels = new int[] { 0 };
+ } else {
+ outputDependencyLabels = new int[] { 1 };
+ }
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index a118ee8..2c924d5 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -41,6 +41,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -60,8 +61,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-public class LogicalOperatorPrettyPrintVisitor implements
- ILogicalOperatorVisitor<String, Integer> {
+public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisitor<String, Integer> {
ILogicalExpressionVisitor<String, Integer> exprVisitor;
@@ -69,42 +69,35 @@
exprVisitor = new LogicalExpressionPrettyPrintVisitor();
}
- public LogicalOperatorPrettyPrintVisitor(
- ILogicalExpressionVisitor<String, Integer> exprVisitor) {
+ public LogicalOperatorPrettyPrintVisitor(ILogicalExpressionVisitor<String, Integer> exprVisitor) {
this.exprVisitor = exprVisitor;
}
@Override
- public String visitAggregateOperator(AggregateOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitAggregateOperator(AggregateOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("aggregate ")
- .append(op.getVariables()).append(" <- ");
+ addIndent(buffer, indent).append("aggregate ").append(op.getVariables()).append(" <- ");
pprintExprList(op.getExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitRunningAggregateOperator(RunningAggregateOperator op,
- Integer indent) throws AlgebricksException {
+ public String visitRunningAggregateOperator(RunningAggregateOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("running-aggregate ")
- .append(op.getVariables()).append(" <- ");
+ addIndent(buffer, indent).append("running-aggregate ").append(op.getVariables()).append(" <- ");
pprintExprList(op.getExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
- Integer indent) {
+ public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("empty-tuple-source");
return buffer.toString();
}
@Override
- public String visitGroupByOperator(GroupByOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitGroupByOperator(GroupByOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("group by (");
pprintVeList(buffer, op.getGroupByList(), indent);
@@ -116,8 +109,7 @@
}
@Override
- public String visitDistinctOperator(DistinctOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitDistinctOperator(DistinctOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("distinct " + "(");
pprintExprList(op.getExpressions(), buffer, indent);
@@ -126,75 +118,62 @@
}
@Override
- public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent)
- .append("join (")
- .append(op.getCondition().getValue()
- .accept(exprVisitor, indent)).append(")");
+ addIndent(buffer, indent).append("join (").append(op.getCondition().getValue().accept(exprVisitor, indent))
+ .append(")");
return buffer.toString();
}
@Override
- public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op,
- Integer indent) throws AlgebricksException {
+ public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent)
- .append("left outer join (")
- .append(op.getCondition().getValue()
- .accept(exprVisitor, indent)).append(")");
+ addIndent(buffer, indent).append("left outer join (")
+ .append(op.getCondition().getValue().accept(exprVisitor, indent)).append(")");
return buffer.toString();
}
@Override
- public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
- Integer indent) {
+ public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("nested tuple source");
return buffer.toString();
}
@Override
- public String visitOrderOperator(OrderOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("order ");
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op
- .getOrderExpressions()) {
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
String fst;
switch (p.first.getKind()) {
- case ASC: {
- fst = "ASC";
- break;
+ case ASC: {
+ fst = "ASC";
+ break;
+ }
+ case DESC: {
+ fst = "DESC";
+ break;
+ }
+ default: {
+ fst = p.first.getExpressionRef().toString();
+ }
}
- case DESC: {
- fst = "DESC";
- break;
- }
- default: {
- fst = p.first.getExpressionRef().toString();
- }
- }
- buffer.append("(" + fst + ", "
- + p.second.getValue().accept(exprVisitor, indent) + ") ");
+ buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
}
return buffer.toString();
}
@Override
- public String visitAssignOperator(AssignOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitAssignOperator(AssignOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("assign ").append(op.getVariables())
- .append(" <- ");
+ addIndent(buffer, indent).append("assign ").append(op.getVariables()).append(" <- ");
pprintExprList(op.getExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitWriteOperator(WriteOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitWriteOperator(WriteOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("write ");
pprintExprList(op.getExpressions(), buffer, indent);
@@ -202,8 +181,7 @@
}
@Override
- public String visitDistributeResultOperator(DistributeResultOperator op,
- Integer indent) throws AlgebricksException {
+ public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("distribute result ");
pprintExprList(op.getExpressions(), buffer, indent);
@@ -211,42 +189,32 @@
}
@Override
- public String visitWriteResultOperator(WriteResultOperator op,
- Integer indent) throws AlgebricksException {
+ public String visitWriteResultOperator(WriteResultOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent)
- .append("load ")
- .append(op.getDataSource())
- .append(" from ")
- .append(op.getPayloadExpression().getValue()
- .accept(exprVisitor, indent))
- .append(" partitioned by ");
+ addIndent(buffer, indent).append("load ").append(op.getDataSource()).append(" from ")
+ .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
pprintExprList(op.getKeyExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitSelectOperator(SelectOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitSelectOperator(SelectOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent)
- .append("select (")
- .append(op.getCondition().getValue()
- .accept(exprVisitor, indent)).append(")");
+ addIndent(buffer, indent).append("select (").append(op.getCondition().getValue().accept(exprVisitor, indent))
+ .append(")");
return buffer.toString();
}
@Override
public String visitProjectOperator(ProjectOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append(
- "project " + "(" + op.getVariables() + ")");
+ addIndent(buffer, indent).append("project " + "(" + op.getVariables() + ")");
return buffer.toString();
}
@Override
- public String visitPartitioningSplitOperator(PartitioningSplitOperator op,
- Integer indent) throws AlgebricksException {
+ public String visitPartitioningSplitOperator(PartitioningSplitOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("partitioning-split (");
pprintExprList(op.getExpressions(), buffer, indent);
@@ -255,8 +223,7 @@
}
@Override
- public String visitSubplanOperator(SubplanOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitSubplanOperator(SubplanOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("subplan {");
printNestedPlans(op, indent, buffer);
@@ -267,58 +234,44 @@
public String visitUnionOperator(UnionAllOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("union");
- for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op
- .getVariableMappings()) {
- buffer.append(" (" + v.first + ", " + v.second + ", " + v.third
- + ")");
+ for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op.getVariableMappings()) {
+ buffer.append(" (" + v.first + ", " + v.second + ", " + v.third + ")");
}
return buffer.toString();
}
@Override
- public String visitUnnestOperator(UnnestOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitUnnestOperator(UnnestOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("unnest " + op.getVariable());
if (op.getPositionalVariable() != null) {
buffer.append(" at " + op.getPositionalVariable());
}
- buffer.append(" <- "
- + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+ buffer.append(" <- " + op.getExpressionRef().getValue().accept(exprVisitor, indent));
return buffer.toString();
}
@Override
- public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(
- "unnest-map "
- + op.getVariables()
- + " <- "
- + op.getExpressionRef().getValue()
- .accept(exprVisitor, indent));
+ "unnest-map " + op.getVariables() + " <- "
+ + op.getExpressionRef().getValue().accept(exprVisitor, indent));
return buffer.toString();
}
@Override
- public String visitDataScanOperator(DataSourceScanOperator op,
- Integer indent) {
+ public String visitDataScanOperator(DataSourceScanOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(
- "data-scan " + op.getProjectVariables() + "<-"
- + op.getVariables() + " <- " + op.getDataSource());
+ "data-scan " + op.getProjectVariables() + "<-" + op.getVariables() + " <- " + op.getDataSource());
return buffer.toString();
}
@Override
- public String visitLimitOperator(LimitOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitLimitOperator(LimitOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append(
- "limit "
- + op.getMaxObjects().getValue()
- .accept(exprVisitor, indent));
+ addIndent(buffer, indent).append("limit " + op.getMaxObjects().getValue().accept(exprVisitor, indent));
ILogicalExpression offset = op.getOffset().getValue();
if (offset != null) {
buffer.append(", " + offset.accept(exprVisitor, indent));
@@ -337,32 +290,30 @@
public String visitScriptOperator(ScriptOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(
- "script (in: " + op.getInputVariables() + ") (out: "
- + op.getOutputVariables() + ")");
+ "script (in: " + op.getInputVariables() + ") (out: " + op.getOutputVariables() + ")");
return buffer.toString();
}
@Override
- public String visitReplicateOperator(ReplicateOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitReplicateOperator(ReplicateOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("replicate ");
return buffer.toString();
}
@Override
- public String visitInsertDeleteOperator(InsertDeleteOperator op,
- Integer indent) throws AlgebricksException {
+ public String visitMaterializeOperator(MaterializeOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- String header = op.getOperation() == Kind.INSERT ? "insert into "
- : "delete from ";
- addIndent(buffer, indent)
- .append(header)
- .append(op.getDataSource())
- .append(" from ")
- .append(op.getPayloadExpression().getValue()
- .accept(exprVisitor, indent))
- .append(" partitioned by ");
+ addIndent(buffer, indent).append("materialize ");
+ return buffer.toString();
+ }
+
+ @Override
+ public String visitInsertDeleteOperator(InsertDeleteOperator op, Integer indent) throws AlgebricksException {
+ StringBuilder buffer = new StringBuilder();
+ String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+ addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
+ .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
if (op.isBulkload()) {
buffer.append(" [bulkload]");
@@ -371,14 +322,12 @@
}
@Override
- public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op,
- Integer indent) throws AlgebricksException {
+ public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- String header = op.getOperation() == Kind.INSERT ? "insert into "
- : "delete from ";
- addIndent(buffer, indent).append(header).append(op.getIndexName())
- .append(" on ").append(op.getDataSourceIndex().getDataSource())
- .append(" from ");
+ String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+ addIndent(buffer, indent).append(header).append(op.getIndexName()).append(" on ")
+ .append(op.getDataSourceIndex().getDataSource()).append(" from ");
pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
if (op.isBulkload()) {
buffer.append(" [bulkload]");
@@ -387,41 +336,36 @@
}
@Override
- public String visitTokenizeOperator(TokenizeOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitTokenizeOperator(TokenizeOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("tokenize ")
- .append(op.getTokenizeVars()).append(" <- ");
+ addIndent(buffer, indent).append("tokenize ").append(op.getTokenizeVars()).append(" <- ");
pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitSinkOperator(SinkOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("sink");
return buffer.toString();
}
@Override
- public String visitExtensionOperator(ExtensionOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(op.toString());
return buffer.toString();
}
- protected static final StringBuilder addIndent(StringBuilder buffer,
- int level) {
+ protected static final StringBuilder addIndent(StringBuilder buffer, int level) {
for (int i = 0; i < level; ++i) {
buffer.append(' ');
}
return buffer;
}
- protected void printNestedPlans(AbstractOperatorWithNestedPlans op,
- Integer indent, StringBuilder buffer) throws AlgebricksException {
+ protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent, StringBuilder buffer)
+ throws AlgebricksException {
boolean first = true;
if (op.getNestedPlans().isEmpty()) {
buffer.append("}");
@@ -441,9 +385,8 @@
}
}
- protected void pprintExprList(
- List<Mutable<ILogicalExpression>> expressions,
- StringBuilder buffer, Integer indent) throws AlgebricksException {
+ protected void pprintExprList(List<Mutable<ILogicalExpression>> expressions, StringBuilder buffer, Integer indent)
+ throws AlgebricksException {
buffer.append("[");
boolean first = true;
for (Mutable<ILogicalExpression> exprRef : expressions) {
@@ -457,9 +400,7 @@
buffer.append("]");
}
- protected void pprintVeList(
- StringBuilder sb,
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList,
+ protected void pprintVeList(StringBuilder sb, List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList,
Integer indent) throws AlgebricksException {
sb.append("[");
boolean fst = true;
@@ -479,13 +420,11 @@
}
@Override
- public String visitExternalDataLookupOperator(
- ExternalDataLookupOperator op, Integer indent)
+ public String visitExternalDataLookupOperator(ExternalDataLookupOperator op, Integer indent)
throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(
- "external-instant-lookup " + op.getVariables() + " <- "
- + op.getExpressionRef().getValue());
+ "external-instant-lookup " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
return buffer.toString();
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index c753042..359e243 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -30,6 +30,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
@@ -79,6 +80,8 @@
public R visitReplicateOperator(ReplicateOperator op, T arg) throws AlgebricksException;
+ public R visitMaterializeOperator(MaterializeOperator op, T arg) throws AlgebricksException;
+
public R visitScriptOperator(ScriptOperator op, T arg) throws AlgebricksException;
public R visitSubplanOperator(SubplanOperator op, T arg) throws AlgebricksException;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 691006f..3da9e76 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -18,7 +18,6 @@
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -487,7 +486,7 @@
}
}
- private boolean worthMaterialization(Mutable<ILogicalOperator> candidate) {
+ protected boolean worthMaterialization(Mutable<ILogicalOperator> candidate) {
AbstractLogicalOperator aop = (AbstractLogicalOperator) candidate.getValue();
if (aop.getPhysicalOperator().expensiveThanMaterialization()) {
return true;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java
new file mode 100644
index 0000000..6484202
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/NestedSubplanToJoinRule.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * replace Subplan operators with nested loop joins where the join condition is true, if the Subplan
+ * does not contain free variables (does not have correlations to the input stream).
+ *
+ * @author yingyib
+ */
+public class NestedSubplanToJoinRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ if (context.checkIfInDontApplySet(this, opRef.getValue()))
+ return false;
+ context.addToDontApplySet(this, opRef.getValue());
+
+ ILogicalOperator op1 = opRef.getValue();
+ if (op1.getInputs().size() == 0) {
+ return false;
+ }
+
+ boolean rewritten = false;
+ for (int index = 0; index < op1.getInputs().size(); index++) {
+ AbstractLogicalOperator child = (AbstractLogicalOperator) op1.getInputs().get(index).getValue();
+ if (child.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+ continue;
+ }
+
+ AbstractOperatorWithNestedPlans subplan = (AbstractOperatorWithNestedPlans) child;
+ Set<LogicalVariable> freeVars = new HashSet<LogicalVariable>();
+ OperatorPropertiesUtil.getFreeVariablesInSubplans(subplan, freeVars);
+ if (!freeVars.isEmpty()) {
+ /**
+ * the subplan is correlated with the outer plan, other rules can deal with it
+ */
+ continue;
+ }
+
+ /** get the input operator of the subplan operator */
+ ILogicalOperator subplanInput = subplan.getInputs().get(0).getValue();
+ AbstractLogicalOperator subplanInputOp = (AbstractLogicalOperator) subplanInput;
+
+ /** If the other join branch is a trivial plan, do not do the rewriting. */
+ if (subplanInputOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
+ continue;
+ }
+
+ /** get all nested top operators */
+ List<ILogicalPlan> nestedPlans = subplan.getNestedPlans();
+ List<Mutable<ILogicalOperator>> nestedRoots = new ArrayList<Mutable<ILogicalOperator>>();
+ for (ILogicalPlan nestedPlan : nestedPlans) {
+ nestedRoots.addAll(nestedPlan.getRoots());
+ }
+ if (nestedRoots.size() == 0) {
+ /** there is no nested top operators */
+ continue;
+ }
+
+ /** expend the input and roots into a DAG of nested loop joins */
+ Mutable<ILogicalExpression> expr = new MutableObject<ILogicalExpression>(ConstantExpression.TRUE);
+ Mutable<ILogicalOperator> nestedRootRef = nestedRoots.get(0);
+ ILogicalOperator join = new LeftOuterJoinOperator(expr, new MutableObject<ILogicalOperator>(subplanInput),
+ nestedRootRef);
+
+ /** rewrite the nested tuple source to be empty tuple source */
+ rewriteNestedTupleSource(nestedRootRef);
+
+ for (int i = 1; i < nestedRoots.size(); i++) {
+ join = new LeftOuterJoinOperator(expr, new MutableObject<ILogicalOperator>(join), nestedRoots.get(i));
+ }
+ op1.getInputs().get(index).setValue(join);
+ context.computeAndSetTypeEnvironmentForOperator(join);
+ rewritten = true;
+ }
+ return rewritten;
+ }
+
+ /**
+ * rewrite NestedTupleSource operators to EmptyTupleSource operators
+ *
+ * @param nestedRootRef
+ */
+ private void rewriteNestedTupleSource(Mutable<ILogicalOperator> nestedRootRef) {
+ AbstractLogicalOperator nestedRoot = (AbstractLogicalOperator) nestedRootRef.getValue();
+ if (nestedRoot.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ nestedRootRef.setValue(new EmptyTupleSourceOperator());
+ }
+ List<Mutable<ILogicalOperator>> inputs = nestedRoot.getInputs();
+ for (Mutable<ILogicalOperator> input : inputs) {
+ rewriteNestedTupleSource(input);
+ }
+ }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushMapOperatorDownThroughProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushMapOperatorDownThroughProductRule.java
new file mode 100644
index 0000000..16a71a4
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushMapOperatorDownThroughProductRule.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class PushMapOperatorDownThroughProductRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+ if (!op1.isMap()) {
+ return false;
+ }
+ Mutable<ILogicalOperator> op2Ref = op1.getInputs().get(0);
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op2Ref.getValue();
+ if (op2.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
+ return false;
+ }
+ AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op2;
+ if (!OperatorPropertiesUtil.isAlwaysTrueCond(join.getCondition().getValue())) {
+ return false;
+ }
+
+ List<LogicalVariable> used = new ArrayList<LogicalVariable>();
+ VariableUtilities.getUsedVariables(op1, used);
+
+ Mutable<ILogicalOperator> b0Ref = op2.getInputs().get(0);
+ ILogicalOperator b0 = b0Ref.getValue();
+ List<LogicalVariable> b0Scm = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(b0, b0Scm);
+ if (b0Scm.containsAll(used)) {
+ // push operator on left branch
+ op2Ref.setValue(b0);
+ b0Ref.setValue(op1);
+ opRef.setValue(op2);
+ return true;
+ } else {
+ Mutable<ILogicalOperator> b1Ref = op2.getInputs().get(1);
+ ILogicalOperator b1 = b1Ref.getValue();
+ List<LogicalVariable> b1Scm = new ArrayList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(b1, b1Scm);
+ if (b1Scm.containsAll(used)) {
+ // push operator on right branch
+ op2Ref.setValue(b1);
+ b1Ref.setValue(op1);
+ opRef.setValue(op2);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
index e1c8463..f6c6ad8 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SimpleUnnestToProductRule.java
@@ -33,6 +33,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -76,16 +77,20 @@
Mutable<ILogicalOperator> tupleSourceOpRef = currentOpRef;
currentOpRef = opRef;
if (tupleSourceOpRef.getValue().getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
- while (currentOpRef.getValue().getInputs().size() == 1
- && currentOpRef.getValue() instanceof AbstractScanOperator
- && descOrSelfIsSourceScan((AbstractLogicalOperator) currentOpRef.getValue())) {
- if (opsAreIndependent(currentOpRef.getValue(), tupleSourceOpRef.getValue())) {
- /** move down the boundary if the operator is independent of the tuple source */
- boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
- } else {
- break;
+ NestedTupleSourceOperator nts = (NestedTupleSourceOperator) tupleSourceOpRef.getValue();
+ // If the subplan input is a trivial plan, do not do the rewriting.
+ if (nts.getSourceOperator().getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE) {
+ while (currentOpRef.getValue().getInputs().size() == 1
+ && currentOpRef.getValue() instanceof AbstractScanOperator
+ && descOrSelfIsSourceScan((AbstractLogicalOperator) currentOpRef.getValue())) {
+ if (opsAreIndependent(currentOpRef.getValue(), tupleSourceOpRef.getValue())) {
+ /** move down the boundary if the operator is independent of the tuple source */
+ boundaryOpRef = currentOpRef.getValue().getInputs().get(0);
+ } else {
+ break;
+ }
+ currentOpRef = currentOpRef.getValue().getInputs().get(0);
}
- currentOpRef = currentOpRef.getValue().getInputs().get(0);
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 1bfa413..8887b82 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -109,8 +109,8 @@
public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
int recordsPerFrame, double factor, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
- RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
- throws HyracksDataException {
+ RecordDescriptor recordDescriptor, IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
@@ -181,7 +181,8 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) throws HyracksDataException {
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -194,7 +195,8 @@
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
- final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
+ .createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
@@ -384,7 +386,8 @@
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
- final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
+ .createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private BuildAndPartitionTaskState state;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 744f939..5cab373 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -115,24 +115,23 @@
private final ITuplePairComparatorFactory tuplePairComparatorFactory0; //For NLJ in probe
private final ITuplePairComparatorFactory tuplePairComparatorFactory1; //For NLJ in probe
private final IPredicateEvaluatorFactory predEvaluatorFactory;
-
+
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
-
- //Flags added for test purpose
+
+ //Flags added for test purpose
private static boolean skipInMemoryHJ = false;
private static boolean forceNLJ = false;
private static boolean forceRR = false;
-
-
+
private static final Logger LOGGER = Logger.getLogger(OptimizedHybridHashJoinOperatorDescriptor.class.getName());
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory0,
- ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory, boolean isLeftOuter,
- INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+ ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory,
+ boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
super(spec, 2, 1);
this.memsize = memsize;
@@ -153,7 +152,8 @@
public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories,
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
- ITuplePairComparatorFactory tupPaircomparatorFactory0, ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
+ ITuplePairComparatorFactory tupPaircomparatorFactory0,
+ ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
throws HyracksDataException {
super(spec, 2, 1);
@@ -214,7 +214,7 @@
}
public static class BuildAndPartitionTaskState extends AbstractStateObject {
-
+
private int memForJoin;
private int numOfPartitions;
private OptimizedHybridHashJoin hybridHJ;
@@ -266,10 +266,10 @@
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
-
- final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
-
-
+
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
+ .createPredicateEvaluator());
+
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
.getJobId(), new TaskId(getActivityId(), partition));
@@ -287,17 +287,16 @@
state.memForJoin = memsize - 2;
state.numOfPartitions = getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor,
nPartitions);
- if(!isLeftOuter){
- state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+ if (!isLeftOuter) {
+ state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
buildHpc, predEvaluator);
- }
- else{
- state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
+ } else {
+ state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
buildHpc, predEvaluator, isLeftOuter, nullWriterFactories1);
}
-
+
state.hybridHJ.initBuild();
}
@@ -331,7 +330,7 @@
* Hybrid Hash Join recursively on them.
*/
private class ProbeAndJoinActivityNode extends AbstractActivityNode {
-
+
private static final long serialVersionUID = 1L;
private final ActivityId buildAid;
@@ -351,8 +350,9 @@
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
- final IPredicateEvaluator predEvaluator = ( predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator());
-
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
+ .createPredicateEvaluator());
+
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
@@ -388,7 +388,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- state.hybridHJ.probe(buffer, writer);
+ state.hybridHJ.probe(buffer, writer);
}
@Override
@@ -425,17 +425,22 @@
}
private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
- RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed) throws HyracksDataException {
+ RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed)
+ throws HyracksDataException {
ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
-
- long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize());
- long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize());
-
- LOGGER.fine("\n>>>Joining Partition Pairs (pid "+pid+") - (level "+level+") - wasReversed "+wasReversed+" - BuildSize:\t"+buildPartSize+"\tProbeSize:\t"+probePartSize+" - MemForJoin "+(state.memForJoin)+" - LeftOuter is "+isLeftOuter);
-
+
+ long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getFrameSize()) : (ohhj
+ .getBuildPartitionSize(pid) / ctx.getFrameSize());
+ long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj
+ .getProbePartitionSize(pid) / ctx.getFrameSize());
+
+ LOGGER.fine("\n>>>Joining Partition Pairs (pid " + pid + ") - (level " + level + ") - wasReversed "
+ + wasReversed + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize
+ + " - MemForJoin " + (state.memForJoin) + " - LeftOuter is " + isLeftOuter);
+
//Apply in-Mem HJ if possible
if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
|| (probePartSize < state.memForJoin && !isLeftOuter)) {
@@ -468,14 +473,16 @@
}
//Apply (Recursive) HHJ
else {
- LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level "+level+"]");
+ LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
OptimizedHybridHashJoin rHHj;
if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
- LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
+ LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ + level + "]");
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
- probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator); //checked-confirmed
+ probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc,
+ predEvaluator); //checked-confirmed
buildSideReader.open();
rHHj.initBuild();
@@ -500,8 +507,9 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
- if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
- LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+ LOGGER.fine("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ + level + "]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -514,11 +522,12 @@
}
} else { //Case 2.1.2 - Switch to NLJ
- LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "+level+"]");
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ LOGGER.fine("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ + level + "]");
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+
if (rbrfw == null || rprfw == null) {
continue;
}
@@ -526,22 +535,23 @@
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
if (isLeftOuter || buildSideInTups < probeSideInTups) {
- applyNestedLoopJoin(buildRd, probeRd, memsize, rprfw, rbrfw,
- nljComparator0, false); //checked-modified
+ applyNestedLoopJoin(buildRd, probeRd, memsize, rprfw, rbrfw, nljComparator0,
+ false); //checked-modified
} else {
- applyNestedLoopJoin(probeRd, buildRd, memsize, rbrfw, rprfw,
- nljComparator1, true); //checked-modified
+ applyNestedLoopJoin(probeRd, buildRd, memsize, rbrfw, rprfw, nljComparator1,
+ true); //checked-modified
}
}
}
} else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
- LOGGER.fine("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
+ LOGGER.fine("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level " + level + "]");
int n = getNumberOfPartitions(state.memForJoin, (int) probePartSize, fudgeFactor,
nPartitions);
-
+
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, BUILD_REL, PROBE_REL,
- buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc, predEvaluator); //checked-confirmed
- rHHj.setIsReversed(true); //Added to use predicateEvaluator (for inMemoryHashJoin) correctly
+ buildKeys, probeKeys, comparators, buildRd, probeRd, buildHpc, probeHpc,
+ predEvaluator); //checked-confirmed
+ rHHj.setIsReversed(true); //Added to use predicateEvaluator (for inMemoryHashJoin) correctly
probeSideReader.open();
rHHj.initBuild();
@@ -563,8 +573,9 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitinStatus();
- if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
- LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+ LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "
+ + level + "]");
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -573,14 +584,15 @@
continue;
}
- joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
+ joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
}
} else { //Case 2.2.2 - Switch to NLJ
- LOGGER.fine("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "+level+"]");
- for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ LOGGER.fine("\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
+ + level + "]");
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
-
+
if (rbrfw == null || rprfw == null) {
continue;
}
@@ -588,11 +600,11 @@
long buildSideSize = rbrfw.getFileSize();
long probeSideSize = rprfw.getFileSize();
if (buildSideSize > probeSideSize) {
- applyNestedLoopJoin(buildRd, probeRd, memsize, rbrfw, rprfw,
- nljComparator0, true); //checked-modified
+ applyNestedLoopJoin(buildRd, probeRd, memsize, rbrfw, rprfw, nljComparator0,
+ true); //checked-modified
} else {
- applyNestedLoopJoin(probeRd, buildRd, memsize, rprfw, rbrfw,
- nljComparator1, true); //checked-modified
+ applyNestedLoopJoin(probeRd, buildRd, memsize, rprfw, rbrfw, nljComparator1,
+ true); //checked-modified
}
}
}
@@ -604,8 +616,8 @@
private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepLarger,
- ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader, boolean reverse, int pid)
- throws HyracksDataException {
+ ITuplePartitionComputer hpcRepSmaller, RunFileReader bReader, RunFileReader pReader,
+ boolean reverse, int pid) throws HyracksDataException {
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(
ctx.getFrameSize(), probeRDesc), hpcRepLarger, new FrameTupleAccessor(ctx.getFrameSize(),
@@ -623,7 +635,7 @@
}
bReader.close();
rPartbuff.clear();
- // probe
+ // probe
pReader.open();
while (pReader.nextFrame(rPartbuff)) {
joiner.join(rPartbuff, writer);
@@ -634,13 +646,13 @@
}
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
- RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator, boolean reverse)
- throws HyracksDataException {
- NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
- new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter, nullWriters1);
- nlj.setIsReversed(reverse);
-
-
+ RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator,
+ boolean reverse) throws HyracksDataException {
+ NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(ctx.getFrameSize(), outerRd),
+ new FrameTupleAccessor(ctx.getFrameSize(), innerRd), nljComparator, memorySize,
+ predEvaluator, isLeftOuter, nullWriters1);
+ nlj.setIsReversed(reverse);
+
ByteBuffer cacheBuff = ctx.allocateFrame();
innerReader.open();
while (innerReader.nextFrame(cacheBuff)) {
@@ -667,16 +679,16 @@
return op;
}
}
-
- public void setSkipInMemHJ(boolean b){
- skipInMemoryHJ = b;
+
+ public void setSkipInMemHJ(boolean b) {
+ skipInMemoryHJ = b;
}
-
- public void setForceNLJ(boolean b){
- forceNLJ = b;
+
+ public void setForceNLJ(boolean b) {
+ forceNLJ = b;
}
-
- public void setForceRR(boolean b){
- forceRR = (!isLeftOuter && b);
+
+ public void setForceRR(boolean b) {
+ forceRR = (!isLeftOuter && b);
}
}
\ No newline at end of file