Added replicate operator with materialization
be more aggressive to find shared plans in ExtractCommonOperatorRule
- find all the isomorphic subgraphs instead of just the ones on join build branches
- while expanding candidates handle the operators with multiple inputs
- analyze the DAG to find all the operators that can be co-scheduled, and infer the dependencies between clusters
- based on the dependencies, decide which outputs of a replicate operator needs materialization
- if the shared branch needs materialization, and it consists of only trivial operators (such as assign, unnest, datasource scan), that branch is discarded from the candidates
- modified the replicate operator descriptor to materialize the input if needed, and read from the materialized file for the outputs that requires materialization
- removed redundant decor variables in group-by
- fixed a bug on computing live variables for unnest-map operator: if the operator does not propagate inputs, those input variables should not be live anymore
- fixed a bug in ComplexUnnestToProductRule
Change-Id: If221d1507844f9409bf1163f93b0c04ef5848578
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/86
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index f3f9837..dafe300 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.algebricks.core.algebra.base;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -57,4 +58,18 @@
public void setHostQueryContext(Object context);
public Object getHostQueryContext();
+
+ /**
+ * @return labels (0 or 1) for each input and output indicating the dependency between them.
+ * The edges labeled as 1 must wait for the edges with label 0.
+ */
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op);
+
+ /*
+ * This is needed to have a kind of cost based decision on whether to merge the shared subplans and materialize the result.
+ * If the subgraph whose result we would like to materialize has an operator that is computationally expensive, we assume
+ * it is cheaper to materialize the result of this subgraph and read from the file rather than recomputing it.
+ */
+ public boolean expensiveThanMaterialization();
+
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
index e5ece69..3d8c4b6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DistinctOperator.java
@@ -99,7 +99,7 @@
@Override
public boolean isMap() {
return false;
- }
+ }
public List<LogicalVariable> getDistinctByVarList() {
List<LogicalVariable> varList = new ArrayList<LogicalVariable>(expressions.size());
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
index f738b50..a456e71 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupByOperator.java
@@ -273,5 +273,4 @@
}
return env;
}
-
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 6dbd3f5..3f13977 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -15,8 +15,12 @@
package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
@@ -28,9 +32,19 @@
public class ReplicateOperator extends AbstractLogicalOperator {
private int outputArity = 2;
+ private boolean[] outputMaterializationFlags = new boolean[outputArity];
+ private List<Mutable<ILogicalOperator>> outputs;
public ReplicateOperator(int outputArity) {
this.outputArity = outputArity;
+ this.outputMaterializationFlags = new boolean[outputArity];
+ this.outputs = new ArrayList<Mutable<ILogicalOperator>>();
+ }
+
+ public ReplicateOperator(int outputArity, boolean[] outputMaterializationFlags) {
+ this.outputArity = outputArity;
+ this.outputMaterializationFlags = outputMaterializationFlags;
+ this.outputs = new ArrayList<Mutable<ILogicalOperator>>();
}
@Override
@@ -71,9 +85,29 @@
return outputArity;
}
+ public int setOutputArity(int outputArity) {
+ return this.outputArity = outputArity;
+ }
+
+ public boolean[] getOutputMaterializationFlags() {
+ return outputMaterializationFlags;
+ }
+
+ public List<Mutable<ILogicalOperator>> getOutputs() {
+ return outputs;
+ }
+
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
return createPropagatingAllInputsTypeEnvironment(ctx);
}
+ public boolean isBlocker() {
+ for (boolean requiresMaterialization : outputMaterializationFlags) {
+ if (requiresMaterialization) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
index 61e0b33..b7a56be 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/UnnestMapOperator.java
@@ -131,5 +131,4 @@
return !propagateInput;
}
*/
-
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 1d0fd7c..b8cc3b8 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -224,7 +224,11 @@
@Override
public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
- standardLayout(op);
+ if (op.propagatesInput()) {
+ standardLayout(op);
+ } else {
+ VariableUtilities.getProducedVariables(op, schemaVariables);
+ }
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
index 25fb453..42d964d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
@@ -40,6 +40,11 @@
return false;
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+
public abstract Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(
IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context)
throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
index 03f03fd..fcc04ab 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator {
@@ -37,4 +39,16 @@
public JoinPartitioningType getPartitioningType() {
return partitioningType;
}
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 1, 0 };
+ int[] outputDependencyLabels = new int[] { 1 };
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index ac64882..4407b75 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -79,6 +80,17 @@
return disableJobGenBelow;
}
+ /**
+ * @return labels (0 or 1) for each input and output indicating the dependency between them.
+ * The edges labeled as 1 must wait for the edges with label 0.
+ */
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[op.getInputs().size()]; // filled with 0's
+ int[] outputDependencyLabels = new int[] { 0 };
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
protected void contributeOpDesc(IHyracksJobBuilder builder, AbstractLogicalOperator op, IOperatorDescriptor opDesc) {
if (op.getExecutionMode() == ExecutionMode.UNPARTITIONED) {
AlgebricksPartitionConstraint apc = new AlgebricksCountPartitionConstraint(1);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index c375a3e..74adf5b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -249,4 +249,8 @@
return null;
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
index 18b442b..299f519 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractScanPOperator.java
@@ -26,4 +26,8 @@
return emptyUnaryRequirements();
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index ffc75c8..8cfe067 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -127,4 +127,15 @@
}
}
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ int[] outputDependencyLabels = new int[] { 1 };
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 99aec26..8b1b447 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -114,4 +114,8 @@
return true;
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 55da00e..3e87281 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -99,4 +99,8 @@
this.flushFramesRapidly = flushFramesRapidly;
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 1ba07be..54fdf2c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -106,4 +106,9 @@
ILogicalOperator src = resultOp.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, resultOp, 0);
}
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
index 107aebd..b714979 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/EmptyTupleSourcePOperator.java
@@ -65,4 +65,8 @@
builder.contributeMicroOperator(op, runtime, recDesc);
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index da70ba7..fe438a5 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -256,4 +256,16 @@
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
}
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ int[] outputDependencyLabels = new int[] { 1 };
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
index 0c07c4c..b56d638 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
@@ -124,4 +124,8 @@
return false;
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
index 5829911..d254151 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -114,4 +114,8 @@
return false;
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
index f2b52da..f68b327 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/NestedTupleSourcePOperator.java
@@ -71,4 +71,8 @@
builder.contributeMicroOperator(op, runtime, recDesc);
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
index cd99155..3f78793 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/PreSortedDistinctByPOperator.java
@@ -136,4 +136,8 @@
return PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY;
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index c00e8f5..d8f7d99 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -64,10 +65,31 @@
ReplicateOperator rop = (ReplicateOperator) op;
int outputArity = rop.getOutputArity();
+ boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
- SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity);
+ SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, outputMaterializationFlags);
contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
}
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ ReplicateOperator rop = (ReplicateOperator) op;
+ int[] outputDependencyLabels = new int[rop.getOutputArity()];
+ // change the labels of outputs that requires materialization to 1
+ boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
+ for (int i = 0; i < rop.getOutputArity(); i++) {
+ if (outputMaterializationFlags[i]) {
+ outputDependencyLabels[i] = 1;
+ }
+ }
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index 956c74f..3b2387a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -30,8 +30,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
@@ -99,4 +97,8 @@
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index c5fa192..aa24221 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -64,4 +64,8 @@
builder.contributeGraphEdge(src, 0, op, 0);
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index fea9701..9105c1f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -104,4 +104,8 @@
builder.contributeGraphEdge(src, 0, write, 0);
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
index e92e3ee..48a0fa6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SortGroupByPOperator.java
@@ -269,4 +269,16 @@
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
}
+
+ @Override
+ public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+ int[] inputDependencyLabels = new int[] { 0 };
+ int[] outputDependencyLabels = new int[] { 1 };
+ return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index 324709a..63ddffe 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -100,4 +100,8 @@
builder.contributeGraphEdge(src, 0, limit, 0);
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index fa7d34e..2ad5fda 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -81,4 +81,8 @@
this.flushFramesRapidly = flushFramesRapidly;
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
index bbf8e9a..3999fa6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamSelectPOperator.java
@@ -73,4 +73,8 @@
builder.contributeGraphEdge(src, 0, select, 0);
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
index 39271bb..e6a0be7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StringStreamingScriptPOperator.java
@@ -75,4 +75,8 @@
computeDeliveredPropertiesForUsedVariables(s, s.getInputVariables());
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index 28df9fe..efc609a 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -104,4 +104,8 @@
builder.contributeGraphEdge(src, 0, op, 0);
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return true;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 7fb6ddc..cda3b00 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -98,4 +98,8 @@
builder.contributeGraphEdge(src2, 0, op, 1);
}
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
index 8b241d3..f399184 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/WriteResultPOperator.java
@@ -103,4 +103,9 @@
ILogicalOperator src = writeResultOp.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, writeResultOp, 0);
}
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index 4ae956a..e1de495 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -41,7 +41,6 @@
import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
public class JobGenContext {
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 961e527..3af57ad 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -26,7 +26,9 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -43,7 +45,8 @@
return context;
}
- public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema, IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
+ public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema,
+ IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
JobSpecification spec = new JobSpecification(context.getFrameSize());
if (jobEventListenerFactory != null) {
spec.setJobletEventListenerFactory(jobEventListenerFactory);
@@ -63,8 +66,8 @@
return spec;
}
- private void compileOpRef(Mutable<ILogicalOperator> opRef, IOperatorDescriptorRegistry spec, IHyracksJobBuilder builder,
- IOperatorSchema outerPlanSchema) throws AlgebricksException {
+ private void compileOpRef(Mutable<ILogicalOperator> opRef, IOperatorDescriptorRegistry spec,
+ IHyracksJobBuilder builder, IOperatorSchema outerPlanSchema) throws AlgebricksException {
ILogicalOperator op = opRef.getValue();
int n = op.getInputs().size();
IOperatorSchema[] schemas = new IOperatorSchema[n];
@@ -100,10 +103,21 @@
Mutable<ILogicalOperator> child = entry.getKey();
List<Mutable<ILogicalOperator>> parents = entry.getValue();
if (parents.size() > 1) {
- int i = 0;
- for (Mutable<ILogicalOperator> parent : parents) {
- builder.contributeGraphEdge(child.getValue(), i, parent.getValue(), 0);
- i++;
+ if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+ ReplicateOperator rop = (ReplicateOperator) child.getValue();
+ if (rop.isBlocker()) {
+ // make the order of the graph edges consistent with the order of rop's outputs
+ List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
+ for (Mutable<ILogicalOperator> parent : parents) {
+ builder.contributeGraphEdge(child.getValue(), outputs.indexOf(parent), parent.getValue(), 0);
+ }
+ } else {
+ int i = 0;
+ for (Mutable<ILogicalOperator> parent : parents) {
+ builder.contributeGraphEdge(child.getValue(), i, parent.getValue(), 0);
+ i++;
+ }
+ }
}
}
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
index 5e6bcae..910a8f9 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ComplexUnnestToProductRule.java
@@ -264,7 +264,7 @@
if (targetUsedVars == null) {
return false;
}
- } else {
+ } else if (innerMatches != 0 && outerMatches != 0) {
// The current operator produces variables that are used by both partitions, so the inner and outer are not independent and, therefore, we cannot create a join.
// TODO: We may still be able to split the operator to create a viable partitioning.
return false;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
index 06b3c06..691006f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java
@@ -15,15 +15,19 @@
package edu.uci.ics.hyracks.algebricks.rewriter.rules;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -48,8 +52,11 @@
private HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childrenToParents = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
private List<Mutable<ILogicalOperator>> roots = new ArrayList<Mutable<ILogicalOperator>>();
- private List<Mutable<ILogicalOperator>> joins = new ArrayList<Mutable<ILogicalOperator>>();
private List<List<Mutable<ILogicalOperator>>> equivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>();
+ private HashMap<Mutable<ILogicalOperator>, BitSet> opToCandidateInputs = new HashMap<Mutable<ILogicalOperator>, BitSet>();
+ private HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = new HashMap<Mutable<ILogicalOperator>, MutableInt>();
+ private HashMap<Integer, BitSet> clusterWaitForMap = new HashMap<Integer, BitSet>();
+ private int lastUsedClusterId = 0;
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
@@ -78,17 +85,18 @@
changed = false;
// applying the rewriting until fixpoint
topDownMaterialization(roots);
- removeNonJoinBuildBranchCandidates();
genCandidates(context);
removeTrivialShare();
- removeNonJoinBuildBranchCandidates();
if (equivalenceClasses.size() > 0)
changed = rewrite(context);
if (!rewritten)
rewritten = changed;
equivalenceClasses.clear();
childrenToParents.clear();
- joins.clear();
+ opToCandidateInputs.clear();
+ clusterMap.clear();
+ clusterWaitForMap.clear();
+ lastUsedClusterId = 0;
} while (changed);
roots.clear();
}
@@ -111,41 +119,6 @@
equivalenceClasses.remove(i);
}
- private void removeNonJoinBuildBranchCandidates() {
- for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
- for (int i = candidates.size() - 1; i >= 0; i--) {
- Mutable<ILogicalOperator> opRef = candidates.get(i);
- boolean reserve = false;
- for (Mutable<ILogicalOperator> join : joins)
- if (isInJoinBuildBranch(join, opRef)) {
- reserve = true;
- }
- if (!reserve)
- candidates.remove(i);
- }
- }
- for (int i = equivalenceClasses.size() - 1; i >= 0; i--)
- if (equivalenceClasses.get(i).size() < 2)
- equivalenceClasses.remove(i);
- }
-
- private boolean isInJoinBuildBranch(Mutable<ILogicalOperator> joinRef, Mutable<ILogicalOperator> opRef) {
- Mutable<ILogicalOperator> buildBranch = joinRef.getValue().getInputs().get(1);
- do {
- if (buildBranch.equals(opRef)) {
- return true;
- } else {
- AbstractLogicalOperator aop = (AbstractLogicalOperator) buildBranch.getValue();
- if (aop.getOperatorTag() == LogicalOperatorTag.INNERJOIN
- || aop.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN
- || buildBranch.getValue().getInputs().size() == 0)
- return false;
- else
- buildBranch = buildBranch.getValue().getInputs().get(0);
- }
- } while (true);
- }
-
private boolean rewrite(IOptimizationContext context) throws AlgebricksException {
boolean changed = false;
for (List<Mutable<ILogicalOperator>> members : equivalenceClasses) {
@@ -170,40 +143,47 @@
members.remove(i);
}
}
- AbstractLogicalOperator rop = new ReplicateOperator(group.size());
+ boolean[] materializationFlags = computeMaterilizationFlags(group);
+ if (group.isEmpty()) {
+ continue;
+ }
+ candidate = group.get(0);
+ ReplicateOperator rop = new ReplicateOperator(group.size(), materializationFlags);
rop.setPhysicalOperator(new ReplicatePOperator());
rop.setExecutionMode(ExecutionMode.PARTITIONED);
Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
AbstractLogicalOperator aopCandidate = (AbstractLogicalOperator) candidate.getValue();
+ List<Mutable<ILogicalOperator>> originalCandidateParents = childrenToParents.get(candidate);
if (aopCandidate.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
rop.getInputs().add(candidate);
} else {
AbstractLogicalOperator beforeExchange = new ExchangeOperator();
beforeExchange.setPhysicalOperator(new OneToOneExchangePOperator());
+ Mutable<ILogicalOperator> beforeExchangeRef = new MutableObject<ILogicalOperator>(beforeExchange);
beforeExchange.getInputs().add(candidate);
context.computeAndSetTypeEnvironmentForOperator(beforeExchange);
- rop.getInputs().add(new MutableObject<ILogicalOperator>(beforeExchange));
+ rop.getInputs().add(beforeExchangeRef);
}
context.computeAndSetTypeEnvironmentForOperator(rop);
- List<Mutable<ILogicalOperator>> parents = childrenToParents.get(candidate);
- for (Mutable<ILogicalOperator> parentRef : parents) {
+ for (Mutable<ILogicalOperator> parentRef : originalCandidateParents) {
AbstractLogicalOperator parent = (AbstractLogicalOperator) parentRef.getValue();
int index = parent.getInputs().indexOf(candidate);
if (parent.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
parent.getInputs().set(index, ropRef);
+ rop.getOutputs().add(parentRef);
} else {
AbstractLogicalOperator exchange = new ExchangeOperator();
exchange.setPhysicalOperator(new OneToOneExchangePOperator());
+ MutableObject<ILogicalOperator> exchangeRef = new MutableObject<ILogicalOperator>(exchange);
exchange.getInputs().add(ropRef);
+ rop.getOutputs().add(exchangeRef);
context.computeAndSetTypeEnvironmentForOperator(exchange);
- // parent.getInputs().get(index).setValue(exchange);
- parent.getInputs().set(index, new MutableObject<ILogicalOperator>(exchange));
+ parent.getInputs().set(index, exchangeRef);
context.computeAndSetTypeEnvironmentForOperator(parent);
}
}
-
List<LogicalVariable> liveVarsNew = new ArrayList<LogicalVariable>();
VariableUtilities.getLiveVariables(candidate.getValue(), liveVarsNew);
ArrayList<Mutable<ILogicalExpression>> assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
@@ -226,9 +206,11 @@
AbstractLogicalOperator exchOp = new ExchangeOperator();
exchOp.setPhysicalOperator(new OneToOneExchangePOperator());
exchOp.getInputs().add(ropRef);
-
- assignOperator.getInputs().add(new MutableObject<ILogicalOperator>(exchOp));
+ MutableObject<ILogicalOperator> exchOpRef = new MutableObject<ILogicalOperator>(exchOp);
+ rop.getOutputs().add(exchOpRef);
+ assignOperator.getInputs().add(exchOpRef);
projectOperator.getInputs().add(new MutableObject<ILogicalOperator>(assignOperator));
+
// set the types
context.computeAndSetTypeEnvironmentForOperator(exchOp);
context.computeAndSetTypeEnvironmentForOperator(assignOperator);
@@ -247,18 +229,18 @@
}
}
- AbstractLogicalOperator exchg = new ExchangeOperator();
- exchg.setPhysicalOperator(new OneToOneExchangePOperator());
-
ILogicalOperator childOp = parentOp.getOperatorTag() == LogicalOperatorTag.PROJECT ? assignOperator
: projectOperator;
if (parentOp.isMap()) {
parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(childOp));
} else {
+ AbstractLogicalOperator exchg = new ExchangeOperator();
+ exchg.setPhysicalOperator(new OneToOneExchangePOperator());
exchg.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(exchg));
+ context.computeAndSetTypeEnvironmentForOperator(exchg);
}
- context.computeAndSetTypeEnvironmentForOperator(exchg);
+ context.computeAndSetTypeEnvironmentForOperator(parentOp);
}
}
rewritten = true;
@@ -302,11 +284,6 @@
List<Mutable<ILogicalOperator>> candidates = new ArrayList<Mutable<ILogicalOperator>>();
List<Mutable<ILogicalOperator>> nextLevel = new ArrayList<Mutable<ILogicalOperator>>();
for (Mutable<ILogicalOperator> op : tops) {
- AbstractLogicalOperator aop = (AbstractLogicalOperator) op.getValue();
- if ((aop.getOperatorTag() == LogicalOperatorTag.INNERJOIN || aop.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN)
- && !joins.contains(op)) {
- joins.add(op);
- }
for (Mutable<ILogicalOperator> opRef : op.getValue().getInputs()) {
List<Mutable<ILogicalOperator>> opRefList = childrenToParents.get(opRef);
if (opRefList == null) {
@@ -335,19 +312,34 @@
candidates.clear();
boolean validCandidate = false;
for (Mutable<ILogicalOperator> op : opList) {
- for (Mutable<ILogicalOperator> inputRef : op.getValue().getInputs()) {
+ List<Mutable<ILogicalOperator>> inputs = op.getValue().getInputs();
+ for (int i = 0; i < inputs.size(); i++) {
+ Mutable<ILogicalOperator> inputRef = inputs.get(i);
validCandidate = false;
- // if current input is in candidates
- for (Mutable<ILogicalOperator> candidate : previousCandidates)
- if (inputRef.getValue().equals(candidate.getValue()))
- validCandidate = true;
- // if one input is not in candidates
- if (!validCandidate)
- break;
+ for (Mutable<ILogicalOperator> candidate : previousCandidates) {
+ // if current input is in candidates
+ if (inputRef.getValue().equals(candidate.getValue())) {
+ if (inputs.size() == 1) {
+ validCandidate = true;
+ } else {
+ BitSet candidateInputBitMap = opToCandidateInputs.get(op);
+ if (candidateInputBitMap == null) {
+ candidateInputBitMap = new BitSet(inputs.size());
+ opToCandidateInputs.put(op, candidateInputBitMap);
+ }
+ candidateInputBitMap.set(i);
+ if (candidateInputBitMap.cardinality() == inputs.size()) {
+ validCandidate = true;
+ }
+ }
+ break;
+ }
+ }
}
if (!validCandidate)
continue;
- candidates.add(op);
+ if (!candidates.contains(op))
+ candidates.add(op);
}
}
@@ -390,4 +382,122 @@
}
}
+ private boolean[] computeMaterilizationFlags(List<Mutable<ILogicalOperator>> group) {
+ lastUsedClusterId = 0;
+ for (Mutable<ILogicalOperator> root : roots) {
+ computeClusters(null, root, new MutableInt(++lastUsedClusterId));
+ }
+ boolean[] materializationFlags = new boolean[group.size()];
+ boolean worthMaterialization = worthMaterialization(group.get(0));
+ boolean requiresMaterialization;
+ // get clusterIds for each candidate in the group
+ List<Integer> groupClusterIds = new ArrayList<Integer>(group.size());
+ for (int i = 0; i < group.size(); i++) {
+ groupClusterIds.add(clusterMap.get(group.get(i)).getValue());
+ }
+ for (int i = group.size() - 1; i >= 0; i--) {
+ requiresMaterialization = requiresMaterialization(groupClusterIds, i);
+ if (requiresMaterialization && !worthMaterialization) {
+ group.remove(i);
+ groupClusterIds.remove(i);
+ }
+ materializationFlags[i] = requiresMaterialization;
+ }
+ if (group.size() < 2) {
+ group.clear();
+ }
+ // if does not worth materialization, the flags for the remaining candidates should be false
+ return worthMaterialization ? materializationFlags : new boolean[group.size()];
+ }
+
+ private boolean requiresMaterialization(List<Integer> groupClusterIds, int index) {
+ Integer clusterId = groupClusterIds.get(index);
+ BitSet blockingClusters = new BitSet();
+ getAllBlockingClusterIds(clusterId, blockingClusters);
+ if (!blockingClusters.isEmpty()) {
+ for (int i = 0; i < groupClusterIds.size(); i++) {
+ if (i == index) {
+ continue;
+ }
+ if (blockingClusters.get(groupClusterIds.get(i))) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private void getAllBlockingClusterIds(int clusterId, BitSet blockingClusters) {
+ BitSet waitFor = clusterWaitForMap.get(clusterId);
+ if (waitFor != null) {
+ for (int i = waitFor.nextSetBit(0); i >= 0; i = waitFor.nextSetBit(i + 1)) {
+ getAllBlockingClusterIds(i, blockingClusters);
+ }
+ blockingClusters.or(waitFor);
+ }
+ }
+
+ private void computeClusters(Mutable<ILogicalOperator> parentRef, Mutable<ILogicalOperator> opRef,
+ MutableInt currentClusterId) {
+ // only replicate operator has multiple outputs
+ int outputIndex = 0;
+ if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+ ReplicateOperator rop = (ReplicateOperator) opRef.getValue();
+ List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
+ for (outputIndex = 0; outputIndex < outputs.size(); outputIndex++) {
+ if (outputs.get(outputIndex).equals(parentRef)) {
+ break;
+ }
+ }
+ }
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) opRef.getValue();
+ Pair<int[], int[]> labels = aop.getPhysicalOperator().getInputOutputDependencyLabels(opRef.getValue());
+ List<Mutable<ILogicalOperator>> inputs = opRef.getValue().getInputs();
+ for (int i = 0; i < inputs.size(); i++) {
+ Mutable<ILogicalOperator> inputRef = inputs.get(i);
+ if (labels.second[outputIndex] == 1 && labels.first[i] == 0) { // 1 -> 0
+ if (labels.second.length == 1) {
+ clusterMap.put(opRef, currentClusterId);
+ // start a new cluster
+ MutableInt newClusterId = new MutableInt(++lastUsedClusterId);
+ computeClusters(opRef, inputRef, newClusterId);
+ BitSet waitForList = clusterWaitForMap.get(currentClusterId.getValue());
+ if (waitForList == null) {
+ waitForList = new BitSet();
+ clusterWaitForMap.put(currentClusterId.getValue(), waitForList);
+ }
+ waitForList.set(newClusterId.getValue());
+ }
+ } else { // 0 -> 0 and 1 -> 1
+ MutableInt prevClusterId = clusterMap.get(opRef);
+ if (prevClusterId == null || prevClusterId.getValue().equals(currentClusterId.getValue())) {
+ clusterMap.put(opRef, currentClusterId);
+ computeClusters(opRef, inputRef, currentClusterId);
+ } else {
+ // merge prevClusterId and currentClusterId: update all the map entries that has currentClusterId to prevClusterId
+ for (BitSet bs : clusterWaitForMap.values()) {
+ if (bs.get(currentClusterId.getValue())) {
+ bs.clear(currentClusterId.getValue());
+ bs.set(prevClusterId.getValue());
+ }
+ }
+ currentClusterId.setValue(prevClusterId.getValue());
+ }
+ }
+ }
+ }
+
+ private boolean worthMaterialization(Mutable<ILogicalOperator> candidate) {
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) candidate.getValue();
+ if (aop.getPhysicalOperator().expensiveThanMaterialization()) {
+ return true;
+ }
+ List<Mutable<ILogicalOperator>> inputs = candidate.getValue().getInputs();
+ for (Mutable<ILogicalOperator> inputRef : inputs) {
+ if (worthMaterialization(inputRef)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
index bc05181..c34a52f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveRedundantVariablesRule.java
@@ -48,22 +48,20 @@
* Replaces redundant variable references with their bottom-most equivalent representative.
* Does a DFS sweep over the plan keeping track of variable equivalence classes.
* For example, this rule would perform the following rewrite.
- *
* Before Plan:
* select (function-call: func, Args:[%0->$$11])
- * project [$11]
- * assign [$$11] <- [$$10]
- * assign [$$10] <- [$$9]
- * assign [$$9] <- ...
- * ...
- *
+ * project [$11]
+ * assign [$$11] <- [$$10]
+ * assign [$$10] <- [$$9]
+ * assign [$$9] <- ...
+ * ...
* After Plan:
* select (function-call: func, Args:[%0->$$9])
- * project [$9]
- * assign [$$11] <- [$$9]
- * assign [$$10] <- [$$9]
- * assign [$$9] <- ...
- * ...
+ * project [$9]
+ * assign [$$11] <- [$$9]
+ * assign [$$10] <- [$$9]
+ * assign [$$9] <- ...
+ * ...
*/
public class RemoveRedundantVariablesRule implements IAlgebraicRewriteRule {
@@ -71,7 +69,7 @@
private final Map<LogicalVariable, List<LogicalVariable>> equivalentVarsMap = new HashMap<LogicalVariable, List<LogicalVariable>>();
protected boolean hasRun = false;
-
+
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
return false;
@@ -138,7 +136,7 @@
if (replaceProjectVars((ProjectOperator) op)) {
modified = true;
}
- } else if(op.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
+ } else if (op.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
// Replace redundant variables manually in the UnionAll operator.
if (replaceUnionAllVars((UnionAllOperator) op)) {
modified = true;
@@ -205,6 +203,25 @@
}
}
}
+ // find the redundant variables within the decor list
+ Map<LogicalVariable, LogicalVariable> variableToFirstDecorMap = new HashMap<LogicalVariable, LogicalVariable>();
+ Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> iter = groupOp.getDecorList().iterator();
+ while (iter.hasNext()) {
+ Pair<LogicalVariable, Mutable<ILogicalExpression>> dp = iter.next();
+ if (dp.first == null || dp.second.getValue().getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ continue;
+ }
+ LogicalVariable dv = ((VariableReferenceExpression) dp.second.getValue()).getVariableReference();
+ LogicalVariable firstDecor = variableToFirstDecorMap.get(dv);
+ if (firstDecor == null) {
+ variableToFirstDecorMap.put(dv, dp.first);
+ } else {
+ // The decor variable dp.first is redundant since firstDecor is exactly the same.
+ updateEquivalenceClassMap(dp.first, firstDecor);
+ iter.remove();
+ modified = true;
+ }
+ }
return modified;
}
@@ -251,7 +268,7 @@
}
return modified;
}
-
+
private class VariableSubstitutionVisitor implements ILogicalExpressionReferenceTransform {
@Override
public boolean transform(Mutable<ILogicalExpression> exprRef) {
diff --git a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q7_volume_shipping.plan b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q7_volume_shipping.plan
index 9f62e7b..70d8e7b 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q7_volume_shipping.plan
+++ b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q7_volume_shipping.plan
@@ -14,38 +14,58 @@
-- NESTED_LOOP |PARTITIONED|
exchange
-- BROADCAST_EXCHANGE |PARTITIONED|
- select (function-call: algebricks:eq, Args:[%0->$$2, GERMANY])
- -- STREAM_SELECT |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan [$$1, $$2]<-[$$1, $$2, $$3, $$4] <- default.nation
- -- DATASOURCE_SCAN |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ project ([$$1, $$2])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$1, $$2] <- [%0->$$9, %0->$$10]
+ -- ASSIGN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ select (function-call: algebricks:eq, Args:[%0->$$10, GERMANY])
+ -- STREAM_SELECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
project ([$$5, $$6])
-- STREAM_PROJECT |PARTITIONED|
- select (function-call: algebricks:eq, Args:[%0->$$6, FRANCE])
- -- STREAM_SELECT |PARTITIONED|
- project ([$$5, $$6])
- -- STREAM_PROJECT |PARTITIONED|
- assign [$$5, $$6] <- [%0->$$9, %0->$$10]
- -- ASSIGN |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- replicate
- -- SPLIT |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
- -- DATASOURCE_SCAN |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ select (function-call: algebricks:eq, Args:[%0->$$6, FRANCE])
+ -- STREAM_SELECT |PARTITIONED|
+ project ([$$5, $$6])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$5, $$6] <- [%0->$$9, %0->$$10]
+ -- ASSIGN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
project ([$$10, $$14, $$9, $$13])
@@ -56,34 +76,58 @@
-- NESTED_LOOP |PARTITIONED|
exchange
-- BROADCAST_EXCHANGE |PARTITIONED|
- select (function-call: algebricks:eq, Args:[%0->$$14, FRANCE])
- -- STREAM_SELECT |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan [$$13, $$14]<-[$$13, $$14, $$15, $$16] <- default.nation
- -- DATASOURCE_SCAN |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- project ([$$9, $$10])
+ project ([$$13, $$14])
-- STREAM_PROJECT |PARTITIONED|
- select (function-call: algebricks:eq, Args:[%0->$$10, GERMANY])
- -- STREAM_SELECT |PARTITIONED|
+ assign [$$13, $$14] <- [%0->$$5, %0->$$6]
+ -- ASSIGN |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
replicate
-- SPLIT |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
- -- DATASOURCE_SCAN |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ select (function-call: algebricks:eq, Args:[%0->$$6, FRANCE])
+ -- STREAM_SELECT |PARTITIONED|
+ project ([$$5, $$6])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$5, $$6] <- [%0->$$9, %0->$$10]
+ -- ASSIGN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ project ([$$9, $$10])
+ -- STREAM_PROJECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ select (function-call: algebricks:eq, Args:[%0->$$10, GERMANY])
+ -- STREAM_SELECT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan [$$9, $$10]<-[$$9, $$10, $$11, $$12] <- default.nation
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
write [%0->$$47, %0->$$48, %0->$$49, %0->$$50]
-- SINK_WRITE |PARTITIONED|
project ([$$47, $$48, $$49, $$50])
diff --git a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u10_nestedloop_join.plan b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u10_nestedloop_join.plan
index c86d57f..a2bb4e4 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u10_nestedloop_join.plan
+++ b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u10_nestedloop_join.plan
@@ -8,17 +8,31 @@
-- NESTED_LOOP |PARTITIONED|
exchange
-- BROADCAST_EXCHANGE |PARTITIONED|
- data-scan [$$1, $$2]<-[$$1, $$2, $$3, $$4] <- default.nation
- -- DATASOURCE_SCAN |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ project ([$$1, $$2])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$1, $$2] <- [%0->$$5, %0->$$6]
+ -- ASSIGN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan [$$5, $$6]<-[$$5, $$6, $$7, $$8] <- default.nation
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan [$$5, $$6]<-[$$5, $$6, $$7, $$8] <- default.nation
- -- DATASOURCE_SCAN |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ data-scan [$$5, $$6]<-[$$5, $$6, $$7, $$8] <- default.nation
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u3_union.plan b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u3_union.plan
index c4040f2..3923e19 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u3_union.plan
+++ b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/u3_union.plan
@@ -12,14 +12,22 @@
-- ASSIGN |PARTITIONED|
select (function-call: algebricks:gt, Args:[function-call: hive:org.apache.hadoop.hive.ql.udf.UDFOPMultiply, Args:[%0->$$1, 2], 50])
-- STREAM_SELECT |PARTITIONED|
- exchange
- -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$1, $$2, $$3, $$4, $$5, $$6, $$7] <- default.supplier
- -- DATASOURCE_SCAN |PARTITIONED|
+ project ([$$1, $$2, $$3, $$4, $$5, $$6, $$7])
+ -- STREAM_PROJECT |PARTITIONED|
+ assign [$$1, $$2, $$3, $$4, $$5, $$6, $$7] <- [%0->$$9, %0->$$10, %0->$$11, %0->$$12, %0->$$13, %0->$$14, %0->$$15]
+ -- ASSIGN |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ data-scan []<-[$$9, $$10, $$11, $$12, $$13, $$14, $$15] <- default.supplier
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
project ([$$16, $$11, $$12, $$10])
@@ -30,9 +38,13 @@
-- STREAM_SELECT |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- data-scan []<-[$$9, $$10, $$11, $$12, $$13, $$14, $$15] <- default.supplier
- -- DATASOURCE_SCAN |PARTITIONED|
+ replicate
+ -- SPLIT |PARTITIONED|
exchange
-- ONE_TO_ONE_EXCHANGE |PARTITIONED|
- empty-tuple-source
- -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ data-scan []<-[$$9, $$10, $$11, $$12, $$13, $$14, $$15] <- default.supplier
+ -- DATASOURCE_SCAN |PARTITIONED|
+ exchange
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ empty-tuple-source
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index f3f8a2f..700dec8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -23,7 +23,6 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobActivityGraph;
public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
new file mode 100644
index 0000000..48de837
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+
+public class MaterializerTaskState extends AbstractStateObject {
+ private RunFileWriter out;
+
+ public MaterializerTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ MaterializerTaskState.class.getSimpleName());
+ out = new RunFileWriter(file, ctx.getIOManager());
+ out.open();
+ }
+
+ public void close() throws HyracksDataException {
+ out.close();
+ }
+
+ public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
+ out.nextFrame(buffer);
+ }
+
+ public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
+ RunFileReader in = out.createReader();
+ writer.open();
+ try {
+ in.open();
+ while (in.nextFrame(frame)) {
+ frame.flip();
+ writer.nextFrame(frame);
+ frame.clear();
+ }
+ in.close();
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ writer.close();
+ }
+ }
+
+ public void deleteFile() {
+ out.getFileReference().delete();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 89c20d6..3a405d0 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -14,12 +14,8 @@
*/
package edu.uci.ics.hyracks.dataflow.std.misc;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
@@ -28,14 +24,9 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
-import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
@@ -84,57 +75,6 @@
}
- public static class MaterializerTaskState extends AbstractStateObject {
- private RunFileWriter out;
-
- public MaterializerTaskState() {
- }
-
- private MaterializerTaskState(JobId jobId, TaskId taskId) {
- super(jobId, taskId);
- }
-
- @Override
- public void toBytes(DataOutput out) throws IOException {
-
- }
-
- @Override
- public void fromBytes(DataInput in) throws IOException {
-
- }
-
- public void open(IHyracksTaskContext ctx) throws HyracksDataException {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- MaterializingOperatorDescriptor.class.getSimpleName());
- out = new RunFileWriter(file, ctx.getIOManager());
- out.open();
- }
-
- public void appendFrame(ByteBuffer buffer) throws HyracksDataException {
- out.nextFrame(buffer);
- }
-
- public void writeOut(IFrameWriter writer, ByteBuffer frame) throws HyracksDataException {
- RunFileReader in = out.createReader();
- writer.open();
- try {
- in.open();
- while (in.nextFrame(frame)) {
- frame.flip();
- writer.nextFrame(frame);
- frame.clear();
- }
- in.close();
- } catch (Exception e) {
- writer.fail();
- throw new HyracksDataException(e);
- } finally {
- writer.close();
- }
- }
- }
-
private final class MaterializerReaderActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
@@ -166,7 +106,7 @@
@Override
public void close() throws HyracksDataException {
- state.out.close();
+ state.close();
ByteBuffer frame = ctx.allocateFrame();
state.writeOut(writer, frame);
}
@@ -202,7 +142,7 @@
@Override
public void close() throws HyracksDataException {
- state.out.close();
+ state.close();
ctx.setStateObject(state);
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 0e8f303..b8e1ac8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -18,64 +18,176 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public class SplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
private static final long serialVersionUID = 1L;
+ private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
+ private final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
+
+ private boolean[] outputMaterializationFlags;
+ private boolean requiresMaterialization;
+ private int numberOfNonMaterializedOutputs = 0;
+ private int numberOfActiveMaterializeReaders = 0;
+
public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) {
+ this(spec, rDesc, outputArity, new boolean[outputArity]);
+ }
+
+ public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
+ boolean[] outputMaterializationFlags) {
super(spec, 1, outputArity);
for (int i = 0; i < outputArity; i++) {
recordDescriptors[i] = rDesc;
}
+ this.outputMaterializationFlags = outputMaterializationFlags;
+ requiresMaterialization = false;
+ for (boolean flag : outputMaterializationFlags) {
+ if (flag) {
+ requiresMaterialization = true;
+ break;
+ }
+ }
+
}
@Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
- throws HyracksDataException {
- return new AbstractUnaryInputOperatorNodePushable() {
- private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(new ActivityId(odId,
+ SPLITTER_MATERIALIZER_ACTIVITY_ID));
+ builder.addActivity(this, sma);
+ builder.addSourceEdge(0, sma, 0);
+ int taskOutputIndex = 0;
+ for (int i = 0; i < outputArity; i++) {
+ if (!outputMaterializationFlags[i]) {
+ builder.addTargetEdge(i, sma, taskOutputIndex);
+ taskOutputIndex++;
+ }
+ }
+ numberOfNonMaterializedOutputs = taskOutputIndex;
- @Override
- public void close() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.close();
+ if (requiresMaterialization) {
+ int activityId = MATERIALIZE_READER_ACTIVITY_ID;
+ for (int i = 0; i < outputArity; i++) {
+ if (outputMaterializationFlags[i]) {
+ MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(new ActivityId(odId,
+ activityId));
+ builder.addActivity(this, mra);
+ builder.addTargetEdge(i, mra, 0);
+ builder.addBlockingEdge(sma, mra);
+ numberOfActiveMaterializeReaders++;
+ activityId++;
}
}
+ }
+ }
- @Override
- public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.fail();
+ private final class SplitterMaterializerActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public SplitterMaterializerActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ return new AbstractUnaryInputOperatorNodePushable() {
+ private MaterializerTaskState state;
+ private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (requiresMaterialization) {
+ state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(
+ getActivityId(), partition));
+ state.open(ctx);
+ }
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ writers[i].open();
+ }
}
- }
- @Override
- public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- FrameUtils.flushFrame(bufferAccessor, writer);
+ @Override
+ public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+ if (requiresMaterialization) {
+ state.appendFrame(bufferAccessor);
+ }
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ FrameUtils.flushFrame(bufferAccessor, writers[i]);
+ }
}
- }
- @Override
- public void open() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.open();
+ @Override
+ public void close() throws HyracksDataException {
+ if (requiresMaterialization) {
+ state.close();
+ ctx.setStateObject(state);
+ }
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ writers[i].close();
+ }
}
- }
- @Override
- public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- writers[index] = writer;
- }
- };
+ @Override
+ public void fail() throws HyracksDataException {
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ writers[i].fail();
+ }
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+ writers[index] = writer;
+ }
+ };
+ }
+ }
+
+ private final class MaterializeReaderActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public MaterializeReaderActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+ throws HyracksDataException {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ ByteBuffer frame = ctx.allocateFrame();
+ MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
+ getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+ state.writeOut(writer, frame);
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ numberOfActiveMaterializeReaders--;
+ MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
+ getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+ if (numberOfActiveMaterializeReaders == 0) {
+ state.deleteFile();
+ }
+ }
+ };
+ }
}
}