GroupJoin: Merged with stabilization rev 1821
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_groupjoin_integration@1822 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/INestedPlan.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/INestedPlan.java
new file mode 100644
index 0000000..1cee5ef
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/INestedPlan.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.base;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+
+public interface INestedPlan {
+
+ public LogicalOperatorTag getOperatorTag();
+
+ public List<ILogicalPlan> getNestedPlans();
+
+ public boolean hasNestedPlans();
+
+ public LinkedList<Mutable<ILogicalOperator>> allRootsInReverseOrder();
+
+ //
+ // @Override
+ // public void computeConstraintsAndEquivClasses() {
+ // for (ILogicalPlan p : nestedPlans) {
+ // for (LogicalOperatorReference r : p.getRoots()) {
+ // AbstractLogicalOperator op = (AbstractLogicalOperator) r.getOperator();
+ // equivalenceClasses.putAll(op.getEquivalenceClasses());
+ // functionalDependencies.addAll(op.getFDs());
+ // }
+ // }
+ // }
+
+ public void recomputeSchema();
+
+ public boolean isMap();
+
+ public void getUsedVariablesExceptNestedPlans(Collection<LogicalVariable> vars);
+
+ public void getProducedVariablesExceptNestedPlans(Collection<LogicalVariable> vars);
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 5234d2c..1c9628f 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -23,6 +23,7 @@
GROUP,
EMPTYTUPLESOURCE,
EXCHANGE,
+ GROUPJOIN,
INNERJOIN,
LEFTOUTERJOIN,
LIMIT,
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index a969372..8349cb2 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -10,10 +10,12 @@
EMPTY_TUPLE_SOURCE,
EXTERNAL_GROUP_BY,
IN_MEMORY_HASH_JOIN,
+ IN_MEMORY_HASH_GROUP_JOIN,
HASH_GROUP_BY,
HASH_PARTITION_EXCHANGE,
HASH_PARTITION_MERGE_EXCHANGE,
HYBRID_HASH_JOIN,
+ HYBRID_HASH_GROUP_JOIN,
HDFS_READER,
IN_MEMORY_STABLE_SORT,
MICRO_PRE_CLUSTERED_GROUP_BY,
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java
index a720039..45d4be9 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AggregateFunctionCallExpression.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
/**
*
@@ -96,6 +97,7 @@
@Override
public <R, T> R accept(ILogicalExpressionVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.finest(" *** AggregateFunctionCallExpression: isTwoStep = " + isTwoStep());
return visitor.visitAggregateFunctionCallExpression(this, arg);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java
index ea4f3e0..7fb6e68 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractOperatorWithNestedPlans.java
@@ -23,9 +23,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-public abstract class AbstractOperatorWithNestedPlans extends AbstractLogicalOperator {
+public abstract class AbstractOperatorWithNestedPlans extends AbstractLogicalOperator implements INestedPlan{
protected final List<ILogicalPlan> nestedPlans;
public AbstractOperatorWithNestedPlans() {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupJoinOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupJoinOperator.java
new file mode 100644
index 0000000..6197f03
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/GroupJoinOperator.java
@@ -0,0 +1,337 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.TypePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+
+public class GroupJoinOperator extends AbstractBinaryJoinOperator implements INestedPlan {
+
+ private final List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gByList;
+ private final List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList;
+ private Mutable<List<ILogicalPlan>> nestedPlans;
+
+ public GroupJoinOperator(JoinKind joinKind, Mutable<ILogicalExpression> condition,
+ Mutable<ILogicalOperator> input1, Mutable<ILogicalOperator> input2) {
+ super(JoinKind.LEFT_OUTER, condition);
+ gByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ decorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ this.nestedPlans = new MutableObject<List<ILogicalPlan>>();
+ }
+
+ public GroupJoinOperator(Mutable<ILogicalExpression> condition) {
+ super(JoinKind.LEFT_OUTER, condition);
+ gByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ decorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ this.nestedPlans = new MutableObject<List<ILogicalPlan>>();
+ }
+
+ public GroupJoinOperator(JoinKind joinKind, Mutable<ILogicalExpression> condition,
+ Mutable<ILogicalOperator> input1, Mutable<ILogicalOperator> input2,
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList,
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList, List<ILogicalPlan> nestedPlans) throws AlgebricksException {
+ super(joinKind, condition, input1, input2);
+ this.gByList = groupByList;
+ this.decorList = decorList;
+ this.nestedPlans = new MutableObject<List<ILogicalPlan>>(nestedPlans);
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.GROUPJOIN;
+ }
+
+ public boolean hasNestedPlans(){
+ return true;
+ }
+
+ public List<ILogicalPlan> getNestedPlans() {
+ return nestedPlans.getValue();
+ }
+
+ public void setNestedPlans(List<ILogicalPlan> nPlans) {
+ this.nestedPlans.setValue(nPlans);
+ }
+
+ public List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> getGroupByList() {
+ return gByList;
+ }
+
+ public List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> getDecorList() {
+ return decorList;
+ }
+
+ public String gByListToString() {
+ return veListToString(gByList);
+ }
+
+ public String decorListToString() {
+ return veListToString(decorList);
+ }
+
+ public List<LogicalVariable> getGbyVarList() {
+ List<LogicalVariable> varList = new ArrayList<LogicalVariable>(gByList.size());
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gByList) {
+ ILogicalExpression expr = ve.second.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ varList.add(v.getVariableReference());
+ }
+ }
+ return varList;
+ }
+
+ public static String veListToString(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ boolean fst = true;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : vePairList) {
+ if (fst) {
+ fst = false;
+ } else {
+ sb.append("; ");
+ }
+ if (ve.first != null) {
+ sb.append(ve.first + " := " + ve.second);
+ } else {
+ sb.append(ve.second.getValue());
+ }
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ return visitor.visitGroupJoinOperator(this, arg);
+ }
+
+ @Override
+ public void recomputeSchema(){
+ schema = new ArrayList<LogicalVariable>();
+// schema.addAll(inputs.get(0).getValue().getSchema());
+// schema.addAll(inputs.get(1).getValue().getSchema());
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+ if (p.first != null)
+ schema.add(p.first);
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
+ schema.add(getDecorVariable(p));
+ }
+
+ for(ILogicalPlan p : getNestedPlans()){
+ if (p == null)
+ continue;
+ for(Mutable<ILogicalOperator> o : p.getRoots()){
+ AbstractLogicalOperator nestedOp = (AbstractLogicalOperator)o.getValue();
+ if (nestedOp != null){
+ schema.addAll(nestedOp.getSchema());
+ }
+ }
+ }
+ }
+
+ @Override
+ public LinkedList<Mutable<ILogicalOperator>> allRootsInReverseOrder() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void getProducedVariablesExceptNestedPlans(Collection<LogicalVariable> vars) {
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+ if (p.first != null) {
+ vars.add(p.first);
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
+ if (p.first != null) {
+ vars.add(p.first);
+ }
+ }
+ }
+
+ @Override
+ public void getUsedVariablesExceptNestedPlans(Collection<LogicalVariable> vars) {
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : gByList) {
+ g.second.getValue().getUsedVariables(vars);
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : decorList) {
+ g.second.getValue().getUsedVariables(vars);
+ }
+ getCondition().getValue().getUsedVariables(vars);
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ ILogicalOperator child1 = inputs.get(0).getValue();
+ ILogicalOperator child2 = inputs.get(1).getValue();
+ IVariableTypeEnvironment env1 = ctx.getOutputTypeEnvironment(child1);
+ IVariableTypeEnvironment env2 = ctx.getOutputTypeEnvironment(child2);
+ int n = 0;
+ for (ILogicalPlan p : getNestedPlans()) {
+ n += p.getRoots().size();
+ }
+ ITypeEnvPointer[] envPointers = new ITypeEnvPointer[n];
+ int i = 0;
+ for (ILogicalPlan p : getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+// ctx.setOutputTypeEnvironment(r.getValue(), env1);
+ envPointers[i] = new OpRefTypeEnvPointer(r, ctx);
+ i++;
+ }
+ }
+
+ IVariableTypeEnvironment env;
+
+ if(getJoinKind() == JoinKind.LEFT_OUTER){
+ env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
+ ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.LEFT_OUTER, envPointers);
+ }
+ else
+ env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
+ ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
+
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : getGroupByList()) {
+ ILogicalExpression expr = p.second.getValue();
+ if (p.first != null) {
+ env.setVarType(p.first, env1.getType(expr));
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ LogicalVariable v1 = ((VariableReferenceExpression) expr).getVariableReference();
+ env.setVarType(v1, env1.getVarType(v1));
+ }
+ } else {
+ VariableReferenceExpression vre = (VariableReferenceExpression) p.second.getValue();
+ LogicalVariable v2 = vre.getVariableReference();
+ env.setVarType(v2, env1.getVarType(v2));
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : getDecorList()) {
+ ILogicalExpression expr = p.second.getValue();
+ if (p.first != null) {
+ env.setVarType(p.first, env1.getType(expr));
+ } else {
+ VariableReferenceExpression vre = (VariableReferenceExpression) p.second.getValue();
+ LogicalVariable v2 = vre.getVariableReference();
+ env.setVarType(v2, env1.getVarType(v2));
+ }
+ }
+ return env;
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ VariablePropagationPolicy groupJoinVarPropPolicy = new VariablePropagationPolicy() {
+
+ @Override
+ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+ throws AlgebricksException {
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+ ILogicalExpression expr = p.second.getValue();
+ if (p.first != null) {
+ target.addVariable(p.first);
+ } else {
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new AlgebricksException("GroupJoin expects variable references.");
+ }
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ target.addVariable(v.getVariableReference());
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new AlgebricksException("GroupJoin expects variable references.");
+ }
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ LogicalVariable decor = v.getVariableReference();
+ if (p.first != null) {
+ target.addVariable(p.first);
+ } else {
+ target.addVariable(decor);
+ }
+ }
+ }
+ };
+
+ for(ILogicalPlan p : getNestedPlans()){
+ if (p == null)
+ continue;
+ for(Mutable<ILogicalOperator> o : p.getRoots()){
+ AbstractLogicalOperator nestedOp = (AbstractLogicalOperator)o.getValue();
+ if (nestedOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE)
+ groupJoinVarPropPolicy = VariablePropagationPolicy.concat(groupJoinVarPropPolicy, nestedOp.getVariablePropagationPolicy());
+ }
+ }
+
+ return groupJoinVarPropPolicy;
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+ boolean b = false;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+ if (visitor.transform(p.second)) {
+ b = true;
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
+ if (visitor.transform(p.second)) {
+ b = true;
+ }
+ }
+ return b;
+ }
+
+ public static LogicalVariable getDecorVariable(Pair<LogicalVariable, Mutable<ILogicalExpression>> p) {
+ if (p.first != null) {
+ return p.first;
+ } else {
+ VariableReferenceExpression e = (VariableReferenceExpression) p.second.getValue();
+ return e.getVariableReference();
+ }
+ }
+
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java
index a9a8f55..1c0cf22 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/NestedTupleSourceOperator.java
@@ -42,6 +42,10 @@
return dataSourceReference.getValue().getInputs().get(0).getValue();
}
+ public ILogicalOperator getSourceOperator(int inputIdx) {
+ return dataSourceReference.getValue().getInputs().get(inputIdx).getValue();
+ }
+
@Override
public LogicalOperatorTag getOperatorTag() {
return LogicalOperatorTag.NESTEDTUPLESOURCE;
@@ -88,8 +92,11 @@
@Override
public IVariableTypeEnvironment getTypeEnv() {
- ILogicalOperator op = dataSourceReference.getValue().getInputs().get(0).getValue();
- return ctx.getOutputTypeEnvironment(op);
+ AbstractLogicalOperator dsr = (AbstractLogicalOperator)dataSourceReference.getValue();
+ if(dsr.getOperatorTag() == LogicalOperatorTag.GROUPJOIN)
+ return ctx.getOutputTypeEnvironment(dsr.getInputs().get(1).getValue());
+ else
+ return ctx.getOutputTypeEnvironment(dsr.getInputs().get(0).getValue());
}
};
return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getNullableTypeComputer(),
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 0539cbe..943a323 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -50,6 +50,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -305,6 +306,147 @@
}
@Override
+ public Void visitGroupJoinOperator(GroupJoinOperator op, IOptimizationContext ctx)
+ throws AlgebricksException {
+ Map<LogicalVariable, EquivalenceClass> equivalenceClasses = new HashMap<LogicalVariable, EquivalenceClass>();
+ List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
+ ctx.putEquivalenceClassMap(op, equivalenceClasses);
+ ctx.putFDList(op, functionalDependencies);
+
+ List<FunctionalDependency> inheritedFDs = new ArrayList<FunctionalDependency>();
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ ILogicalOperator op2 = r.getValue();
+ equivalenceClasses.putAll(getOrComputeEqClasses(op2, ctx));
+ inheritedFDs.addAll(getOrComputeFDs(op2, ctx));
+ }
+ }
+
+ ILogicalOperator opLeft = op.getInputs().get(0).getValue();
+ inheritedFDs.addAll(getOrComputeFDs(opLeft, ctx));
+ ILogicalOperator opRight = op.getInputs().get(1).getValue();
+ inheritedFDs.addAll(getOrComputeFDs(opRight, ctx));
+
+ Map<LogicalVariable, EquivalenceClass> inheritedEcs = new HashMap<LogicalVariable, EquivalenceClass>();
+ inheritedEcs.putAll(getOrComputeEqClasses(opLeft, ctx));
+ inheritedEcs.putAll(getOrComputeEqClasses(opRight, ctx));
+
+ ILogicalExpression joinExpr = op.getCondition().getValue();
+
+ // Join part of GroupJoin
+ switch(op.getJoinKind()){
+ case INNER:
+ joinExpr.getConstraintsAndEquivClasses(functionalDependencies, equivalenceClasses);
+ break;
+ case LEFT_OUTER:
+ Collection<LogicalVariable> leftSideVars;
+ if (opLeft.getSchema() == null) {
+ leftSideVars = new LinkedList<LogicalVariable>();
+ VariableUtilities.getLiveVariables(opLeft, leftSideVars);
+ // actually, not all produced vars. are visible (due to projection)
+ // so using cached schema is better and faster
+ } else {
+ leftSideVars = opLeft.getSchema();
+ }
+ joinExpr.getConstraintsForOuterJoin(functionalDependencies, leftSideVars);
+ break;
+ default:
+ throw new NotImplementedException();
+ }
+
+ // Group part of GroupJoin
+ for (FunctionalDependency inherited : inheritedFDs) {
+ boolean isCoveredByGbyOrDecorVars = true;
+ List<LogicalVariable> newHead = new ArrayList<LogicalVariable>(inherited.getHead().size());
+ for (LogicalVariable v : inherited.getHead()) {
+ LogicalVariable vnew = getNewGbyVar(op, v);
+ if (vnew == null) {
+ vnew = getNewDecorVar(op, v);
+ if (vnew == null) {
+ isCoveredByGbyOrDecorVars = false;
+ }
+ break;
+ }
+ newHead.add(vnew);
+ }
+
+ if (isCoveredByGbyOrDecorVars) {
+ List<LogicalVariable> newTail = new ArrayList<LogicalVariable>();
+ for (LogicalVariable v2 : inherited.getTail()) {
+ LogicalVariable v3 = getNewGbyVar(op, v2);
+ if (v3 != null) {
+ newTail.add(v3);
+ }
+ }
+ if (!newTail.isEmpty()) {
+ FunctionalDependency newFd = new FunctionalDependency(newHead, newTail);
+ functionalDependencies.add(newFd);
+ }
+ }
+ }
+
+ List<LogicalVariable> premiseGby = new LinkedList<LogicalVariable>();
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gByList = op.getGroupByList();
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+ premiseGby.add(p.first);
+ }
+
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList = op.getDecorList();
+
+ LinkedList<LogicalVariable> conclDecor = new LinkedList<LogicalVariable>();
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
+ conclDecor.add(GroupJoinOperator.getDecorVariable(p));
+ }
+ if (!conclDecor.isEmpty()) {
+ functionalDependencies.add(new FunctionalDependency(premiseGby, conclDecor));
+ }
+
+ Set<LogicalVariable> gbySet = new HashSet<LogicalVariable>();
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ gbySet.add(v.getVariableReference());
+ }
+ }
+ LocalGroupingProperty lgp = new LocalGroupingProperty(gbySet);
+ lgp.normalizeGroupingColumns(inheritedEcs, inheritedFDs);
+ Set<LogicalVariable> normSet = lgp.getColumnSet();
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> newGbyList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ boolean changed = false;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+ LogicalVariable v2 = varRef.getVariableReference();
+ EquivalenceClass ec2 = inheritedEcs.get(v2);
+ LogicalVariable v3;
+ if (ec2 != null && !ec2.representativeIsConst()) {
+ v3 = ec2.getVariableRepresentative();
+ } else {
+ v3 = v2;
+ }
+ if (normSet.contains(v3)) {
+ newGbyList.add(p);
+ } else {
+ changed = true;
+ decorList.add(p);
+ }
+ } else {
+ newGbyList.add(p);
+ }
+ }
+ if (changed) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Group-by list changed from "
+ + GroupJoinOperator.veListToString(gByList) + " to " + GroupJoinOperator.veListToString(newGbyList)
+ + ".\n");
+ }
+ gByList.clear();
+ gByList.addAll(newGbyList);
+ return null;
+ }
+
+ @Override
public Void visitInnerJoinOperator(InnerJoinOperator op, IOptimizationContext ctx) throws AlgebricksException {
Map<LogicalVariable, EquivalenceClass> equivalenceClasses = new HashMap<LogicalVariable, EquivalenceClass>();
List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
@@ -612,8 +754,16 @@
ctx.putFDList(op, fds);
}
- private LogicalVariable getNewGbyVar(GroupByOperator g, LogicalVariable v) {
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getGroupByList()) {
+ private LogicalVariable getNewGbyVar(AbstractLogicalOperator op, LogicalVariable v) {
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList;
+ if(op.getOperatorTag() == LogicalOperatorTag.GROUP)
+ groupByList = ((GroupByOperator)op).getGroupByList();
+ else if(op.getOperatorTag() == LogicalOperatorTag.GROUPJOIN)
+ groupByList = ((GroupJoinOperator)op).getGroupByList();
+ else
+ return null;
+
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : groupByList) {
ILogicalExpression e = p.second.getValue();
if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
LogicalVariable v2 = ((VariableReferenceExpression) e).getVariableReference();
@@ -625,8 +775,16 @@
return null;
}
- private LogicalVariable getNewDecorVar(GroupByOperator g, LogicalVariable v) {
- for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getDecorList()) {
+ private LogicalVariable getNewDecorVar(AbstractLogicalOperator op, LogicalVariable v) {
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList;
+ if(op.getOperatorTag() == LogicalOperatorTag.GROUP)
+ decorList = ((GroupByOperator)op).getDecorList();
+ else if(op.getOperatorTag() == LogicalOperatorTag.GROUPJOIN)
+ decorList = ((GroupJoinOperator)op).getDecorList();
+ else
+ return null;
+
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
ILogicalExpression e = p.second.getValue();
if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
LogicalVariable v2 = ((VariableReferenceExpression) e).getVariableReference();
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 31061db..d5b577a 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -33,6 +33,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -42,6 +43,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -49,6 +51,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@@ -189,6 +192,17 @@
}
@Override
+ public Boolean visitGroupJoinOperator(GroupJoinOperator op, ILogicalOperator arg)
+ throws AlgebricksException {
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+ if (aop.getOperatorTag() != LogicalOperatorTag.GROUPJOIN)
+ return Boolean.FALSE;
+ GroupJoinOperator joinOpArg = (GroupJoinOperator) copyAndSubstituteVar(op, arg);
+ boolean isomorphic = op.getCondition().getValue().equals(joinOpArg.getCondition().getValue());
+ return isomorphic;
+ }
+
+ @Override
public Boolean visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.INNERJOIN)
@@ -636,6 +650,27 @@
}
@Override
+ public ILogicalOperator visitGroupJoinOperator(GroupJoinOperator op, Void arg)
+ throws AlgebricksException {
+
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ ArrayList<ILogicalPlan> newSubplans = new ArrayList<ILogicalPlan>();
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getGroupByList())
+ groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
+ deepCopyExpressionRef(pair.second)));
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : op.getDecorList())
+ decorList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(pair.first,
+ deepCopyExpressionRef(pair.second)));
+ for (ILogicalPlan plan : op.getNestedPlans()) {
+ newSubplans.add(IsomorphismOperatorVisitor.deepCopy(plan));
+ }
+
+ return new GroupJoinOperator(op.getJoinKind(), deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
+ .getInputs().get(1), groupByList, decorList, newSubplans);
+ }
+
+ @Override
public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
return new InnerJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
.getInputs().get(1));
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 562bb4c..bd27939 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -28,10 +28,11 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractAssignOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -40,6 +41,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -96,7 +98,7 @@
@Override
public Void visitGroupByOperator(GroupByOperator op, ILogicalOperator arg) throws AlgebricksException {
mapChildren(op, arg);
- mapVariablesForGroupBy(op, arg);
+ mapVariablesForGroupBy(op, arg, LogicalOperatorTag.GROUP);
mapVariablesInNestedPlans(op, arg);
return null;
}
@@ -114,6 +116,14 @@
}
@Override
+ public Void visitGroupJoinOperator(GroupJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
+ mapVariablesStandard(op, arg);
+ mapVariablesForGroupBy(op, arg, LogicalOperatorTag.GROUPJOIN);
+ mapVariablesInNestedPlans(op, arg);
+ return null;
+ }
+
+ @Override
public Void visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
@@ -297,15 +307,27 @@
rightOp.getExpressions());
}
- private void mapVariablesForGroupBy(ILogicalOperator left, ILogicalOperator right) throws AlgebricksException {
- GroupByOperator leftOp = (GroupByOperator) left;
- GroupByOperator rightOp = (GroupByOperator) right;
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> leftPairs = leftOp.getGroupByList();
- List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> rightPairs = rightOp.getGroupByList();
- mapVarExprPairList(leftPairs, rightPairs);
- leftPairs = leftOp.getDecorList();
- rightPairs = rightOp.getDecorList();
- mapVarExprPairList(leftPairs, rightPairs);
+ private void mapVariablesForGroupBy(ILogicalOperator left, ILogicalOperator right, LogicalOperatorTag opTag) throws AlgebricksException {
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> lGByList;
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> rGByList;
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> lDecorList;
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> rDecorList;
+ if(opTag == LogicalOperatorTag.GROUP){
+ lGByList = ((GroupByOperator) left).getGroupByList();
+ rGByList = ((GroupByOperator) right).getGroupByList();
+ lDecorList = ((GroupByOperator) left).getDecorList();
+ rDecorList = ((GroupByOperator) right).getDecorList();
+ }
+ else if(opTag == LogicalOperatorTag.GROUPJOIN){
+ lGByList = ((GroupJoinOperator) left).getGroupByList();
+ rGByList = ((GroupJoinOperator) right).getGroupByList();
+ lDecorList = ((GroupJoinOperator) left).getDecorList();
+ rDecorList = ((GroupJoinOperator) right).getDecorList();
+ }
+ else
+ return;
+ mapVarExprPairList(lGByList, rGByList);
+ mapVarExprPairList(lDecorList, rDecorList);
}
private void mapVarExprPairList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> leftPairs,
@@ -349,8 +371,8 @@
}
private void mapVariablesInNestedPlans(ILogicalOperator opOrigin, ILogicalOperator arg) throws AlgebricksException {
- AbstractOperatorWithNestedPlans op = (AbstractOperatorWithNestedPlans) opOrigin;
- AbstractOperatorWithNestedPlans argOp = (AbstractOperatorWithNestedPlans) arg;
+ INestedPlan op = (INestedPlan) opOrigin;
+ INestedPlan argOp = (INestedPlan) arg;
List<ILogicalPlan> plans = op.getNestedPlans();
List<ILogicalPlan> plansArg = argOp.getNestedPlans();
if (plans.size() != plansArg.size())
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 9b2f5a0..36ea800 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -32,6 +32,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -122,6 +123,13 @@
}
@Override
+ public Void visitGroupJoinOperator(GroupJoinOperator op, IOptimizationContext arg)
+ throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
public Void visitInnerJoinOperator(InnerJoinOperator op, IOptimizationContext arg) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 994c6cb..406a4d7 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -120,6 +121,26 @@
}
@Override
+ public Void visitGroupJoinOperator(GroupJoinOperator op, Void arg) throws AlgebricksException {
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ VariableUtilities.getProducedVariables(r.getValue(), producedVariables);
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : op.getGroupByList()) {
+ if (p.first != null) {
+ producedVariables.add(p.first);
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : op.getDecorList()) {
+ if (p.first != null) {
+ producedVariables.add(p.first);
+ }
+ }
+ return null;
+ }
+
+ @Override
public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
return null;
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 9295179..b72e427 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -24,8 +24,10 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -34,6 +36,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -127,6 +130,32 @@
}
@Override
+ public Void visitGroupJoinOperator(GroupJoinOperator op, Void arg) throws AlgebricksException {
+ standardLayout(op);
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ VariableUtilities.getLiveVariables(r.getValue(), schemaVariables);
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : op.getGroupByList()) {
+ if (p.first != null) {
+ schemaVariables.add(p.first);
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : op.getDecorList()) {
+ if (p.first != null) {
+ schemaVariables.add(p.first);
+ } else {
+ ILogicalExpression e = p.second.getValue();
+ if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ schemaVariables.add(((VariableReferenceExpression) e).getVariableReference());
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
standardLayout(op);
return null;
@@ -152,7 +181,12 @@
@Override
public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
- VariableUtilities.getLiveVariables(op.getSourceOperator(), schemaVariables);
+ AbstractLogicalOperator sourceOp= (AbstractLogicalOperator)op.getDataSourceReference().getValue();
+ if(sourceOp.getOperatorTag() == LogicalOperatorTag.GROUPJOIN){
+ VariableUtilities.getLiveVariables(op.getSourceOperator(1), schemaVariables);
+ }
+ else
+ VariableUtilities.getLiveVariables(op.getSourceOperator(), schemaVariables);
return null;
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 11e56ca..c8d81ca 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -156,6 +157,22 @@
}
@Override
+ public Void visitGroupJoinOperator(GroupJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ subst(pair.first, pair.second, op.getGroupByList());
+ subst(pair.first, pair.second, op.getDecorList());
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), pair.first,
+ pair.second, goThroughNts, ctx);
+ }
+ }
+ op.getCondition().getValue().substituteVar(pair.first, pair.second);
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
public Void visitInnerJoinOperator(InnerJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
op.getCondition().getValue().substituteVar(pair.first, pair.second);
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 56487cc..250dbcb 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -125,6 +126,23 @@
}
@Override
+ public Void visitGroupJoinOperator(GroupJoinOperator op, Void arg) throws AlgebricksException {
+ for (ILogicalPlan p : op.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ VariableUtilities.getUsedVariablesInDescendantsAndSelf(r.getValue(), usedVariables);
+ }
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : op.getGroupByList()) {
+ g.second.getValue().getUsedVariables(usedVariables);
+ }
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : op.getDecorList()) {
+ g.second.getValue().getUsedVariables(usedVariables);
+ }
+ op.getCondition().getValue().getUsedVariables(usedVariables);
+ return null;
+ }
+
+ @Override
public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) {
op.getCondition().getValue().getUsedVariables(usedVariables);
return null;
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index c5f4c71..20141d2 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
@@ -88,7 +89,7 @@
}
protected AlgebricksPipeline[] compileSubplans(IOperatorSchema outerPlanSchema,
- AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext context)
+ INestedPlan npOp, IOperatorSchema opSchema, JobGenContext context)
throws AlgebricksException {
AlgebricksPipeline[] subplans = new AlgebricksPipeline[npOp.getNestedPlans().size()];
PlanCompiler pc = new PlanCompiler(context);
@@ -100,7 +101,7 @@
}
private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema,
- AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
+ INestedPlan npOp, IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
if (p.getRoots().size() > 1) {
throw new NotImplementedException("Nested plans with several roots are not supported.");
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashGroupJoinPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashGroupJoinPOperator.java
new file mode 100644
index 0000000..105d104
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/HybridHashGroupJoinPOperator.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.SerializableAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashGroupJoinOperatorDescriptor;
+
+public class HybridHashGroupJoinPOperator extends AbstractHashJoinPOperator {
+
+ private final int memSizeInFrames;
+ private final int maxInputBuildSizeInFrames;
+ private final int aveRecordsPerFrame;
+ private final double fudgeFactor;
+
+ public HybridHashGroupJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+ List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities,
+ int memSizeInFrames, int maxInputSize0InFrames, int aveRecordsPerFrame, double fudgeFactor) {
+ super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
+ this.memSizeInFrames = memSizeInFrames;
+ this.maxInputBuildSizeInFrames = maxInputSize0InFrames;
+ this.aveRecordsPerFrame = aveRecordsPerFrame;
+ this.fudgeFactor = fudgeFactor;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.HYBRID_HASH_GROUP_JOIN;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ public double getFudgeFactor() {
+ return fudgeFactor;
+ }
+
+ public int getMemSizeInFrames() {
+ return memSizeInFrames;
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ GroupJoinOperator opLogical = (GroupJoinOperator) op;
+ int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
+ int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+ INullWriterFactory[] nullWriterFactories;
+ switch (kind) {
+ case INNER: {
+ nullWriterFactories = null;
+ break;
+ }
+ case LEFT_OUTER: {
+ Collection<LogicalVariable> nestedProdVars = new HashSet<LogicalVariable>();
+ for(ILogicalPlan p : opLogical.getNestedPlans()){
+ VariableUtilities.getProducedVariables(p.getRoots().get(0).getValue(), nestedProdVars);
+ }
+ nullWriterFactories = new INullWriterFactory[nestedProdVars.size()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+
+ IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
+ keysLeftBranch, env, context);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
+ int i = 0;
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ for (LogicalVariable v : keysLeftBranch) {
+ Object t = env.getVarType(v);
+ comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+ }
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ IOperatorDescriptor opDesc = null;
+
+ int numDecors = opLogical.getDecorList().size();
+ int[] decorKeys = new int[numDecors];
+ int j = 0;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : opLogical.getDecorList()) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new AlgebricksException("groupjoin expects variable references.");
+ }
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ LogicalVariable decor = v.getVariableReference();
+ decorKeys[j++] = inputSchemas[0].findVariable(decor);
+ }
+
+ int n = 0;
+ AggregateOperator aggOp;
+ List<AggregateOperator> aggOpList = new LinkedList<AggregateOperator>();
+ AbstractLogicalOperator nestedOp;
+
+ for(Mutable<ILogicalOperator> nOp : opLogical.getNestedPlans().get(0).getRoots()) {
+ nestedOp = (AbstractLogicalOperator) nOp.getValue();
+ if(nestedOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ aggOp = (AggregateOperator) nestedOp;
+ n += aggOp.getExpressions().size();
+ aggOpList.add(aggOp);
+ }
+ }
+
+ ICopySerializableAggregateFunctionFactory[] aff = new ICopySerializableAggregateFunctionFactory[n];
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+// ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
+ IVariableTypeEnvironment aggOpInputEnv;
+// IVariableTypeEnvironment outputEnv = context.getTypeEnvironment(op);
+ compileSubplans(inputSchemas[1], opLogical, propagatedSchema, context);
+ i=0;
+ for(AggregateOperator a : aggOpList) {
+ aggOpInputEnv = context.getTypeEnvironment(a.getInputs().get(0).getValue());
+ for (Mutable<ILogicalExpression> exprRef : a.getExpressions()) {
+ AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) exprRef.getValue();
+ aff[i++] = expressionRuntimeProvider.createSerializableAggregateFunctionFactory(aggFun, aggOpInputEnv, inputSchemas,
+ context);
+// intermediateTypes.add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv,
+// context.getMetadataProvider()));
+ }
+ }
+ IAggregatorDescriptorFactory aggregatorFactory = new SerializableAggregatorDescriptorFactory(aff);
+
+ try{
+ opDesc = new HybridHashGroupJoinOperatorDescriptor(spec, memSizeInFrames, maxInputBuildSizeInFrames, aveRecordsPerFrame,
+ fudgeFactor, keysLeft, keysRight, decorKeys, hashFunFactories, comparatorFactories, comparatorFactories,
+ /* aggregatorFactory */ aggregatorFactory, recDescriptor, opLogical.getJoinKind() == JoinKind.LEFT_OUTER,
+ nullWriterFactories);
+ } catch(HyracksDataException e){
+ throw new AlgebricksException(e.getMessage());
+ }
+
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ ILogicalOperator src1 = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src1, 0, op, 0);
+ ILogicalOperator src2 = op.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(src2, 0, op, 1);
+ }
+
+ @Override
+ protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op0 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ IPhysicalPropertiesVector pv0 = op0.getPhysicalOperator().getDeliveredProperties();
+ List<ILocalStructuralProperty> lp0 = pv0.getLocalProperties();
+ if (lp0 != null) {
+ // maintains the local properties on the probe side
+ return new LinkedList<ILocalStructuralProperty>(lp0);
+ }
+ return new LinkedList<ILocalStructuralProperty>();
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashGroupJoinPOperator.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashGroupJoinPOperator.java
new file mode 100644
index 0000000..ccbbe0b
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashGroupJoinPOperator.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.aggreg.SerializableAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashGroupJoinOperatorDescriptor;
+
+public class InMemoryHashGroupJoinPOperator extends AbstractHashJoinPOperator {
+
+ private final int tableSize;
+
+ /**
+ * builds on the first operator and probes on the second.
+ */
+
+ public InMemoryHashGroupJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+ List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities, int tableSize) {
+ super(kind, partitioningType, sideLeftOfEqualities, sideRightOfEqualities);
+ this.tableSize = tableSize;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.IN_MEMORY_HASH_GROUP_JOIN;
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + keysLeftBranch + keysRightBranch;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ GroupJoinOperator opLogical = (GroupJoinOperator) op;
+ int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
+ int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+
+ INullWriterFactory[] nullWriterFactories;
+ switch (kind) {
+ case INNER: {
+ nullWriterFactories = null;
+ break;
+ }
+ case LEFT_OUTER: {
+ Collection<LogicalVariable> nestedProdVars = new HashSet<LogicalVariable>();
+ for(ILogicalPlan p : opLogical.getNestedPlans()){
+ VariableUtilities.getProducedVariables(p.getRoots().get(0).getValue(), nestedProdVars);
+ }
+ nullWriterFactories = new INullWriterFactory[nestedProdVars.size()];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = context.getNullWriterFactory();
+ }
+ break;
+ }
+ default: {
+ throw new NotImplementedException();
+ }
+ }
+
+ IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
+ keysLeftBranch, env, context);
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
+ int i = 0;
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ for (LogicalVariable v : keysLeftBranch) {
+ Object t = env.getVarType(v);
+ comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+ }
+ RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ IOperatorDescriptor opDesc = null;
+
+ int numDecors = opLogical.getDecorList().size();
+ int[] decorKeys = new int[numDecors];
+ int j = 0;
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : opLogical.getDecorList()) {
+ ILogicalExpression expr = p.second.getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ throw new AlgebricksException("groupjoin expects variable references.");
+ }
+ VariableReferenceExpression v = (VariableReferenceExpression) expr;
+ LogicalVariable decor = v.getVariableReference();
+ decorKeys[j++] = inputSchemas[0].findVariable(decor);
+ }
+
+/* if (opLogical.getNestedPlans().size() != 1) {
+ throw new AlgebricksException(
+ "External group-by currently works only for one nested plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+ ILogicalPlan p0 = opLogical.getNestedPlans().get(0);
+ if (p0.getRoots().size() != 1) {
+ throw new AlgebricksException(
+ "External group-by currently works only for one nested plan with one root containing"
+ + "an aggregate and a nested-tuple-source.");
+ }
+*/
+
+ int n = 0;
+ AggregateOperator aggOp;
+ List<AggregateOperator> aggOpList = new LinkedList<AggregateOperator>();
+ AbstractLogicalOperator nestedOp;
+
+ for(Mutable<ILogicalOperator> nOp : opLogical.getNestedPlans().get(0).getRoots()) {
+ nestedOp = (AbstractLogicalOperator) nOp.getValue();
+ if(nestedOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ aggOp = (AggregateOperator) nestedOp;
+ n += aggOp.getExpressions().size();
+ aggOpList.add(aggOp);
+ }
+ }
+
+ ICopySerializableAggregateFunctionFactory[] aff = new ICopySerializableAggregateFunctionFactory[n];
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+// ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
+ IVariableTypeEnvironment aggOpInputEnv;
+// IVariableTypeEnvironment outputEnv = context.getTypeEnvironment(op);
+ compileSubplans(inputSchemas[1], opLogical, propagatedSchema, context);
+ i=0;
+ for(AggregateOperator a : aggOpList) {
+ aggOpInputEnv = context.getTypeEnvironment(a.getInputs().get(0).getValue());
+ for (Mutable<ILogicalExpression> exprRef : a.getExpressions()) {
+ AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) exprRef.getValue();
+ aff[i++] = expressionRuntimeProvider.createSerializableAggregateFunctionFactory(aggFun, aggOpInputEnv, inputSchemas,
+ context);
+// intermediateTypes.add(partialAggregationTypeComputer.getType(aggFun, aggOpInputEnv,
+// context.getMetadataProvider()));
+ }
+ }
+ IAggregatorDescriptorFactory aggregatorFactory = new SerializableAggregatorDescriptorFactory(aff);
+ opDesc = new InMemoryHashGroupJoinOperatorDescriptor(spec, keysLeft, keysRight, decorKeys, hashFunFactories,
+ comparatorFactories, comparatorFactories, /* aggregatorFactory */ aggregatorFactory, recDescriptor,
+ opLogical.getJoinKind() == JoinKind.LEFT_OUTER, nullWriterFactories, tableSize);
+
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ ILogicalOperator src1 = op.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src1, 0, op, 0);
+ ILogicalOperator src2 = op.getInputs().get(1).getValue();
+ builder.contributeGraphEdge(src2, 0, op, 1);
+ }
+
+ @Override
+ protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context) {
+ AbstractLogicalOperator op0 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ IPhysicalPropertiesVector pv0 = op0.getPhysicalOperator().getDeliveredProperties();
+ List<ILocalStructuralProperty> lp0 = pv0.getLocalProperties();
+ if (lp0 != null) {
+ // maintains the local properties on the probe side
+ return new LinkedList<ILocalStructuralProperty>(lp0);
+ }
+ return new LinkedList<ILocalStructuralProperty>();
+ }
+
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index a94c78e..7ab457c 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint;
+import java.util.Collection;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
@@ -23,7 +24,10 @@
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -33,6 +37,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -55,6 +60,9 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisitor<String, Integer> {
@@ -104,6 +112,15 @@
}
@Override
+ public String visitGroupJoinOperator(GroupJoinOperator op, Integer indent) throws AlgebricksException {
+ StringBuilder buffer = new StringBuilder();
+ addIndent(buffer, indent).append("group join (").append(op.getCondition().getValue()).append(") gby (")
+ .append(op.gByListToString()).append(") decor (").append(op.decorListToString()).append(") {");
+ printNestedPlans(op, indent, buffer);
+ return buffer.toString();
+ }
+
+ @Override
public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("join (").append(op.getCondition().getValue()).append(")");
@@ -268,7 +285,7 @@
return buffer;
}
- private void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent, StringBuilder buffer)
+ private void printNestedPlans(INestedPlan op, Integer indent, StringBuilder buffer)
throws AlgebricksException {
boolean first = true;
if (op.getNestedPlans().isEmpty()) {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
index aa96513..7265626 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/PlanPrettyPrinter.java
@@ -19,6 +19,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
@@ -61,7 +62,7 @@
pad(out, indent);
appendln(out, "-- " + pOp.toString() + " |" + op.getExecutionMode() + "|");
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans opNest = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan opNest = (INestedPlan) op;
for (ILogicalPlan p : opNest.getNestedPlans()) {
pad(out, indent + 8);
appendln(out, "{");
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 47a979c..d4ee3be 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -22,11 +22,11 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -148,7 +148,7 @@
}
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans aonp = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan aonp = (INestedPlan) op;
for (ILogicalPlan p : aonp.getNestedPlans()) {
for (Mutable<ILogicalOperator> ref : p.getRoots()) {
AbstractLogicalOperator aop = (AbstractLogicalOperator) ref.getValue();
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index cd8f042..05d8432 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -83,7 +84,7 @@
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans s = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan s = (INestedPlan) op;
for (ILogicalPlan p : s.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) r.getValue(), freeVars);
@@ -99,7 +100,7 @@
}
}
- public static void getFreeVariablesInSubplans(AbstractOperatorWithNestedPlans op, Set<LogicalVariable> freeVars)
+ public static void getFreeVariablesInSubplans(INestedPlan op, Set<LogicalVariable> freeVars)
throws AlgebricksException {
for (ILogicalPlan p : op.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
@@ -127,7 +128,7 @@
computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) i.getValue(), context);
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan a = (INestedPlan) op;
for (ILogicalPlan p : a.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) r.getValue(), context);
@@ -145,7 +146,7 @@
computeSchemaRecIfNull((AbstractLogicalOperator) i.getValue());
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan a = (INestedPlan) op;
for (ILogicalPlan p : a.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
computeSchemaRecIfNull((AbstractLogicalOperator) r.getValue());
@@ -195,7 +196,7 @@
typeOpRec(i, context);
}
if (op.hasNestedPlans()) {
- for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+ for (ILogicalPlan p : ((INestedPlan) op).getNestedPlans()) {
typePlan(p, context);
}
}
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 6b5949e..6f4ab15 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -57,6 +58,8 @@
public R visitLimitOperator(LimitOperator op, T arg) throws AlgebricksException;
+ public R visitGroupJoinOperator(GroupJoinOperator op, T arg) throws AlgebricksException;
+
public R visitInnerJoinOperator(InnerJoinOperator op, T arg) throws AlgebricksException;
public R visitLeftOuterJoinOperator(LeftOuterJoinOperator op, T arg) throws AlgebricksException;
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
index 3382c6e..251439d 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
@@ -102,7 +103,7 @@
}
if (op.hasNestedPlans() && enterNestedPlans) {
- AbstractOperatorWithNestedPlans o2 = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan o2 = (INestedPlan) op;
for (ILogicalPlan p : o2.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
if (rewriteOperatorRef(r, rule, enterNestedPlans, fullDFS)) {
diff --git a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index e6b296b..7611987 100644
--- a/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++ b/hyracks-algebricks/hyracks-algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -8,6 +8,7 @@
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -22,6 +23,7 @@
PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
+ PhysicalOperatorTag.HYBRID_HASH_GROUP_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_GROUP_JOIN,
PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.SPLIT, PhysicalOperatorTag.STABLE_SORT,
PhysicalOperatorTag.UNION_ALL };
@@ -92,7 +94,7 @@
computeSchemaBottomUpForOp((AbstractLogicalOperator) i.getValue());
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan a = (INestedPlan) op;
for (ILogicalPlan p : a.getNestedPlans()) {
computeSchemaBottomUpForPlan(p);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index de2e6af..2954233 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -16,6 +16,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -233,7 +234,7 @@
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan nested = (INestedPlan) op;
for (ILogicalPlan p : nested.getNestedPlans()) {
if (physOptimizePlan(p, required, true, context)) {
changed = true;
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 8a79d81..a31d7dd 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -113,7 +114,7 @@
++cnt;
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans n = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan n = (INestedPlan) op;
List<EquivalenceClass> eqc = equivClasses;
if (n.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
eqc = new LinkedList<EquivalenceClass>();
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
index f223844..5829ecb 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushProjectDownRule.java
@@ -28,6 +28,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -146,7 +147,7 @@
}
}
if (op2.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans n = (AbstractOperatorWithNestedPlans) op2;
+ INestedPlan n = (INestedPlan) op2;
for (ILogicalPlan p : n.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
if (pushNeededProjections(toPush, r, context, initialOp)) {
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReinferAllTypesRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReinferAllTypesRule.java
index e18a380..6dd045a 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReinferAllTypesRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReinferAllTypesRule.java
@@ -5,6 +5,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
@@ -39,7 +40,7 @@
typeOpRec(i, context);
}
if (op.hasNestedPlans()) {
- for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+ for (ILogicalPlan p : ((INestedPlan) op).getNestedPlans()) {
typePlan(p, context);
}
}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
index c53ea0a..24dbad9 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/RemoveUnusedAssignAndAggregateRule.java
@@ -26,6 +26,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -74,7 +75,7 @@
removeUnusedAssigns(cRef, toRemove, context);
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans opWithNest = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan opWithNest = (INestedPlan) op;
Iterator<ILogicalPlan> planIter = opWithNest.getNestedPlans().iterator();
while (planIter.hasNext()) {
ILogicalPlan p = planIter.next();
@@ -135,7 +136,7 @@
collectUnusedAssignedVars((AbstractLogicalOperator) c.getValue(), toRemove, false, context);
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans opWithNested = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan opWithNested = (INestedPlan) op;
for (ILogicalPlan plan : opWithNested.getNestedPlans()) {
for (Mutable<ILogicalOperator> r : plan.getRoots()) {
collectUnusedAssignedVars((AbstractLogicalOperator) r.getValue(), toRemove, false, context);
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReplaceJoinGroupWithGroupJoinRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReplaceJoinGroupWithGroupJoinRule.java
new file mode 100644
index 0000000..4755347
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ReplaceJoinGroupWithGroupJoinRule.java
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions.ComparisonKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class ReplaceJoinGroupWithGroupJoinRule implements IAlgebraicRewriteRule {
+ Logger LOGGER = AlgebricksConfig.ALGEBRICKS_LOGGER;
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
+ try{
+ // Check if first op is GROUP
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ LOGGER.finest("GROUPJOIN_REPLACE: " + op.getOperatorTag().toString() + " " + op.hashCode());
+ if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "Not Group");
+ return false;
+ }
+
+ // Check if Group-By contains Aggregate Op
+ GroupByOperator gby = (GroupByOperator) op;
+ for(ILogicalPlan p : gby.getNestedPlans()){
+ for(Mutable<ILogicalOperator> o : p.getRoots()){
+ AbstractLogicalOperator nestedOp = (AbstractLogicalOperator) o.getValue();
+ boolean isOpAggOrNTS = false;
+ if(nestedOp.getOperatorTag() == LogicalOperatorTag.AGGREGATE){
+ AggregateOperator aggOp = (AggregateOperator)nestedOp;
+ if (aggOp.getVariables().size() < 1) {
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "No Agg variables!");
+ return false;
+ }
+ List<Mutable<ILogicalExpression>> expressions = aggOp.getExpressions();
+ for (int i = 0; i < expressions.size(); i++) {
+ AggregateFunctionCallExpression aggFun = (AggregateFunctionCallExpression) expressions.get(i).getValue();
+ if (aggFun.isTwoStep()){
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "TwoStep Agg");
+ //return false;
+ }
+ }
+ isOpAggOrNTS = true;
+ }
+ if(nestedOp.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE || nestedOp.getOperatorTag() == LogicalOperatorTag.ASSIGN)
+ isOpAggOrNTS = true;
+
+ if(!isOpAggOrNTS)
+ return false;
+ }
+
+ }
+
+ // Check if child op is a JOIN
+ Mutable<ILogicalOperator> opRef2 = op.getInputs().get(0);
+ AbstractLogicalOperator childOfOp = (AbstractLogicalOperator) opRef2.getValue();
+ LOGGER.finest("GROUPJOIN_REPLACE: " + childOfOp.getOperatorTag().toString() + " " + childOfOp.hashCode());
+
+ if (childOfOp.getOperatorTag() == LogicalOperatorTag.ASSIGN){
+ opRef2 = childOfOp.getInputs().get(0);
+ childOfOp = (AbstractLogicalOperator) opRef2.getValue();
+ LOGGER.finest("GROUPJOIN_REPLACE: " + childOfOp.getOperatorTag().toString() + " " + childOfOp.hashCode());
+ }
+
+ if (childOfOp.getOperatorTag() != LogicalOperatorTag.INNERJOIN
+ && childOfOp.getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) {
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "Not Join");
+ return false;
+ }
+
+
+ AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) opRef2.getValue();
+ Mutable<ILogicalOperator> joinBranchLeftRef = join.getInputs().get(0);
+ Mutable<ILogicalOperator> joinBranchRightRef = join.getInputs().get(1);
+ Collection<LogicalVariable> joinBranchLeftVars = new HashSet<LogicalVariable>();
+ Collection<LogicalVariable> joinBranchRightVars = new HashSet<LogicalVariable>();
+ List<ILogicalExpression> exprs = new LinkedList<ILogicalExpression>();
+ exprs.add(join.getCondition().getValue());
+ VariableUtilities.getLiveVariables(joinBranchRightRef.getValue(), joinBranchRightVars);
+
+ // Check if join condition is EQ
+ ILogicalExpression e;
+ AbstractFunctionCallExpression fexp;
+ FunctionIdentifier fi;
+ ComparisonKind ck;
+ while(!exprs.isEmpty()) {
+ e = exprs.get(0);
+ switch (e.getExpressionTag()) {
+ case FUNCTION_CALL: {
+ fexp = (AbstractFunctionCallExpression) e;
+ fi = fexp.getFunctionIdentifier();
+
+ if (fi.equals(AlgebricksBuiltinFunctions.AND)) {
+ for (Mutable<ILogicalExpression> a : fexp.getArguments()) {
+ exprs.add(a.getValue());
+ }
+ }
+ else {
+ ck = AlgebricksBuiltinFunctions.getComparisonType(fi);
+ if (ck != ComparisonKind.EQ) {
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "Not HashJoin1");
+ return false;
+ }
+ ILogicalExpression opLeft = fexp.getArguments().get(0).getValue();
+ ILogicalExpression opRight = fexp.getArguments().get(1).getValue();
+ if (opLeft.getExpressionTag() != LogicalExpressionTag.VARIABLE
+ || opRight.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "Not HashJoin2");
+ return false;
+ }
+/* LogicalVariable var1 = ((VariableReferenceExpression) opLeft).getVariableReference();
+ if(inLeftAll == null)
+ throw new AlgebricksException("inLeftAll is null");
+ if (inLeftAll.contains(var1)) {
+ if (!outLeftFields.contains(var1))
+ outLeftFields.add(var1);
+ } else if (inRightAll.contains(var1) && !outRightFields.contains(var1)) {
+ outRightFields.add(var1);
+ } else {
+ return false;
+ }
+ LogicalVariable var2 = ((VariableReferenceExpression) opRight).getVariableReference();
+ if (inLeftAll.contains(var2) && !outLeftFields.contains(var2)) {
+ outLeftFields.add(var2);
+ } else if (inRightAll.contains(var2) && !outRightFields.contains(var2)) {
+ outRightFields.add(var2);
+ } else {
+ return false;
+ }
+ return true;
+*/
+ }
+ exprs.remove(0);
+ break;
+ }
+ default: {
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "Not HashJoin3");
+ return false;
+ }
+ }
+ }
+
+ // check: 1) PK(or FD on PK) used in join; 2) if Group-By vars are in join expression
+ Collection<LogicalVariable> joinVars = new HashSet<LogicalVariable>();
+ List<LogicalVariable> groupVars = gby.getGbyVarList();
+ join.getCondition().getValue().getUsedVariables(joinVars);
+
+ joinVars.removeAll(joinBranchRightVars);
+ // 1. PK
+ boolean pkExists = false;
+ for (LogicalVariable v : joinVars) {
+ if(context.findPrimaryKey(v) != null) {
+ pkExists = true;
+ break;
+ }
+ }
+ if (!pkExists){
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "No PK");
+// return false;
+ }
+
+ // 2. Group-By vars
+ // check groupVars.size = joinVars.size
+ if (groupVars.size() != joinVars.size()){
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "Not |GroupVars| == |JoinVars| 1");
+ return false;
+ }
+ //List
+ List<FunctionalDependency> gByFDList = null;
+ for (LogicalVariable v : groupVars){
+ if (!joinVars.remove(v)){
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "Not |GroupVars| == |JoinVars| 2");
+ LOGGER.finest("GROUPJOIN_REPLACE: " + v.getId());
+ if(context.findPrimaryKey(v) == null)
+ return false;
+ }
+ }
+ if (!joinVars.isEmpty()){
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "Not |GroupVars| == |JoinVars| 3");
+ return false;
+ }
+ // check if Aggregate vars are in join right branch
+ Collection<LogicalVariable> aggrVariables = new HashSet<LogicalVariable>();
+ for (ILogicalPlan p : gby.getNestedPlans()){
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ AbstractLogicalOperator op1Abs = (AbstractLogicalOperator) r.getValue();
+ if (op1Abs.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+ AggregateOperator op1 = (AggregateOperator) op1Abs;
+ //Collection<LogicalVariable> usedVariables = new HashSet<LogicalVariable>();
+ VariableUtilities.getUsedVariables(op1, aggrVariables);
+ }
+ }
+ }
+ if (!joinBranchRightVars.containsAll(aggrVariables)){
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "Not AggrVar in Right branch");
+ return false;
+ }
+ // create GroupJoin op and replace in plan
+ // make same as merge of Join and Group constructors
+
+ List<ILogicalPlan> nestedPlans = gby.getNestedPlans();
+ context.computeAndSetTypeEnvironmentForOperator(joinBranchLeftRef.getValue());
+ context.computeAndSetTypeEnvironmentForOperator(joinBranchRightRef.getValue());
+ GroupJoinOperator groupJoinOp = new GroupJoinOperator(join.getJoinKind(), join.getCondition(), joinBranchLeftRef, joinBranchRightRef,
+ gby.getGroupByList(), gby.getDecorList(), nestedPlans);
+ boolean foundNTS = true;
+ for(ILogicalPlan p : nestedPlans){
+ Mutable<ILogicalOperator> oRef = p.getRoots().get(0);
+ AbstractLogicalOperator absOp = (AbstractLogicalOperator)oRef.getValue();
+ foundNTS &= updateNTSInNestedPlan(absOp, groupJoinOp, context);
+/* if(absOp.getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE){
+ absOp = (AbstractLogicalOperator)absOp.getInputs().get(0).getValue();
+ }
+ else{
+ NestedTupleSourceOperator nts = (NestedTupleSourceOperator)absOp;
+ nts.getDataSourceReference().setValue(groupJoinOp);
+ context.computeAndSetTypeEnvironmentForOperator(nts);
+ foundNTS = true;
+ AggregateOperator agg = (AggregateOperator)absOp;
+ List<Mutable<ILogicalOperator>> aggInpList = agg.getInputs();
+ aggInpList.clear();
+ aggInpList.add(new MutableObject<ILogicalOperator>(nts));
+ ILogicalPlan np1 = new ALogicalPlanImpl(oRef);
+ nestedPlans.add(np1);
+ context.computeAndSetTypeEnvironmentForOperator(agg);
+ OperatorPropertiesUtil.typeOpRec(oRef, context);
+ }
+*/ }
+ if(!foundNTS)
+ return false;
+
+ groupJoinOp.setNestedPlans(nestedPlans);
+
+ // context.addToDontApplySet(new IntroduceCombinerRule(), groupJoinOp);
+ context.computeAndSetTypeEnvironmentForOperator(groupJoinOp);
+
+ LOGGER.finest("GROUPJOIN_REPLACE: GroupJoin nestedPlans size = " + groupJoinOp.getNestedPlans().size());
+
+ for (ILogicalPlan p : groupJoinOp.getNestedPlans()){
+ LOGGER.finest("GROUPJOIN_REPLACE: GroupJoin nestedPlan root size = " + p.getRoots().size());
+ for (Mutable<ILogicalOperator> r : p.getRoots()) {
+ AbstractLogicalOperator op1Abs = (AbstractLogicalOperator) r.getValue();
+ if (op1Abs != null) {
+ LOGGER.finest("GROUPJOIN_REPLACE: GroupJoin nestedOp = " + op1Abs.getOperatorTag().name());
+ Collection<LogicalVariable> vars = new HashSet<LogicalVariable>();
+ VariableUtilities.getLiveVariables(op1Abs, vars);
+ for(LogicalVariable v : vars){
+ LOGGER.finest("\t** GROUPJOIN_REPLACE: var = " + v.getId());
+ }
+ for(Mutable<ILogicalOperator> mo: op1Abs.getInputs()){
+ AbstractLogicalOperator childAbs = (AbstractLogicalOperator) mo.getValue();
+ LOGGER.finest("\t** GROUPJOIN_REPLACE: childOp = " + childAbs.getOperatorTag().name());
+ Collection<LogicalVariable> childVars = new HashSet<LogicalVariable>();
+ VariableUtilities.getLiveVariables(childAbs, childVars);
+ for(LogicalVariable v : vars){
+ LOGGER.finest("\t\t** GROUPJOIN_REPLACE: var = " + v.getId());
+ }
+ }
+ }
+ }
+ }
+
+ opRef.setValue(groupJoinOp);
+
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "GroupJoin match");
+ return true;
+ } catch (Exception e){
+ LOGGER.finest("GROUPJOIN_REPLACE: Error = " + e.getMessage());
+ return false;
+ }
+ }
+
+ private boolean updateNTSInNestedPlan(AbstractLogicalOperator op,
+ GroupJoinOperator gjOp, IOptimizationContext context) throws AlgebricksException{
+ LOGGER.finest("GROUPJOIN_REPLACE: " + "NestedPlan \t" + op.getOperatorTag().toString());
+ if(op.getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE){
+ boolean foundNTS = updateNTSInNestedPlan((AbstractLogicalOperator)op.getInputs().get(0).getValue(), gjOp, context);
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ return foundNTS;
+ }
+ else{
+ NestedTupleSourceOperator nts = (NestedTupleSourceOperator)op;
+ nts.getDataSourceReference().setValue(gjOp);
+ context.computeAndSetTypeEnvironmentForOperator(nts);
+ return true;
+ }
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 38cf96e..e9a1034 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -12,6 +12,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
@@ -27,6 +28,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
@@ -63,6 +65,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.WriteResultPOperator;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.rewriter.util.GroupJoinUtils;
import edu.uci.ics.hyracks.algebricks.rewriter.util.JoinUtils;
public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule {
@@ -163,6 +166,11 @@
}
break;
}
+ case GROUPJOIN: {
+ GroupJoinUtils.setJoinAlgorithmAndExchangeAlgo((GroupJoinOperator) op, context);
+// System.exit(0);
+ break;
+ }
case INNERJOIN: {
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
break;
@@ -282,7 +290,7 @@
}
}
if (op.hasNestedPlans()) {
- AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
+ INestedPlan nested = (INestedPlan) op;
for (ILogicalPlan p : nested.getNestedPlans()) {
setPhysicalOperators(p, false, context);
}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/GroupJoinUtils.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/GroupJoinUtils.java
new file mode 100644
index 0000000..beccf45
--- /dev/null
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/GroupJoinUtils.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions.ComparisonKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalPropertiesVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HybridHashGroupJoinPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HybridHashJoinPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashGroupJoinPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryHashJoinPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.NLJoinPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+
+public class GroupJoinUtils {
+
+ private final static int MB = 1048576;
+
+ private final static double DEFAULT_FUDGE_FACTOR = 1.3;
+ private final static int MAX_RECORDS_PER_FRAME = 512;
+ private final static int DEFAULT_FRAME_SIZE = 32768;
+ private final static int MAX_LEFT_INPUT_SIZE_HYBRID_HASH = (int) (140L * 1024 * MB / DEFAULT_FRAME_SIZE);
+ private final static int DEFAULT_MEMORY_SIZE_HYBRID_HASH = (int) (256L * MB / DEFAULT_FRAME_SIZE);
+
+ public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ List<LogicalVariable> sideLeft = new LinkedList<LogicalVariable>();
+ List<LogicalVariable> sideRight = new LinkedList<LogicalVariable>();
+ List<LogicalVariable> varsLeft = op.getInputs().get(0).getValue().getSchema();
+ List<LogicalVariable> varsRight = op.getInputs().get(1).getValue().getSchema();
+ if (isHashJoinCondition(op.getCondition().getValue(), varsLeft, varsRight, sideLeft, sideRight)) {
+ BroadcastSide side = getBroadcastJoinSide(op.getCondition().getValue(), varsLeft, varsRight);
+ if (side == null) {
+ setHashGroupJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);
+ } else {
+ switch (side) {
+ case RIGHT:
+ setHashGroupJoinOp(op, JoinPartitioningType.BROADCAST, sideLeft, sideRight, context);
+ break;
+ case LEFT:
+ //Mutable<ILogicalOperator> opRef0 = op.getInputs().get(0);
+ //Mutable<ILogicalOperator> opRef1 = op.getInputs().get(1);
+ //ILogicalOperator tmp = opRef0.getValue();
+ //opRef0.setValue(opRef1.getValue());
+ //opRef1.setValue(tmp);
+ setHashGroupJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);
+ break;
+ default:
+ setHashGroupJoinOp(op, JoinPartitioningType.PAIRWISE, sideLeft, sideRight, context);
+ }
+ }
+ } else {
+ throw new AlgebricksException("Not a hash join: Group Join is not suitable");
+ }
+ }
+
+ private static void setHashGroupJoinOp(AbstractBinaryJoinOperator op, JoinPartitioningType partitioningType,
+ List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IOptimizationContext context)
+ throws AlgebricksException {
+ op.setPhysicalOperator(new HybridHashGroupJoinPOperator(op.getJoinKind(), partitioningType, sideLeft, sideRight,
+ DEFAULT_MEMORY_SIZE_HYBRID_HASH, MAX_LEFT_INPUT_SIZE_HYBRID_HASH, MAX_RECORDS_PER_FRAME,
+ DEFAULT_FUDGE_FACTOR));
+ if (partitioningType == JoinPartitioningType.BROADCAST) {
+ hybridToInMemHashGroupJoin(op, context);
+ }
+ // op.setPhysicalOperator(new
+ // InMemoryHashJoinPOperator(op.getJoinKind(), partitioningType,
+ // sideLeft, sideRight,
+ // 1024 * 512));
+ }
+
+ private static void hybridToInMemHashGroupJoin(AbstractBinaryJoinOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ ILogicalOperator opBuild = op.getInputs().get(0).getValue();
+ LogicalPropertiesVisitor.computeLogicalPropertiesDFS(opBuild, context);
+ ILogicalPropertiesVector v = context.getLogicalPropertiesVector(opBuild);
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine("// HybridHashGroupJoin inner branch -- Logical properties for " + opBuild
+ + ": " + v + "\n");
+ if (v != null) {
+ int size2 = v.getMaxOutputFrames();
+ // Yingyi - Need a way to calculate memory taken by aggregators. For now, it is set as a factor = 1
+ int aggregatorSize = 1;
+ HybridHashGroupJoinPOperator hhgj = (HybridHashGroupJoinPOperator) op.getPhysicalOperator();
+ if (size2 > 0 && size2 * hhgj.getFudgeFactor() * aggregatorSize <= hhgj.getMemSizeInFrames()) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.fine("// HybridHashGroupJoin inner branch " + opBuild
+ + " fits in memory\n");
+ // maintains the local properties on the probe side
+ op.setPhysicalOperator(new InMemoryHashGroupJoinPOperator(hhgj.getKind(), hhgj.getPartitioningType(), hhgj
+ .getKeysLeftBranch(), hhgj.getKeysRightBranch(), v.getNumberOfTuples() * 2));
+ }
+ }
+
+ }
+
+ public static boolean isHashJoinCondition(ILogicalExpression e, Collection<LogicalVariable> inLeftAll,
+ Collection<LogicalVariable> inRightAll, Collection<LogicalVariable> outLeftFields,
+ Collection<LogicalVariable> outRightFields) {
+ switch (e.getExpressionTag()) {
+ case FUNCTION_CALL: {
+ AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) e;
+ FunctionIdentifier fi = fexp.getFunctionIdentifier();
+ if (fi.equals(AlgebricksBuiltinFunctions.AND)) {
+ for (Mutable<ILogicalExpression> a : fexp.getArguments()) {
+ if (!isHashJoinCondition(a.getValue(), inLeftAll, inRightAll, outLeftFields,
+ outRightFields)) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ ComparisonKind ck = AlgebricksBuiltinFunctions.getComparisonType(fi);
+ if (ck != ComparisonKind.EQ) {
+ return false;
+ }
+ ILogicalExpression opLeft = fexp.getArguments().get(0).getValue();
+ ILogicalExpression opRight = fexp.getArguments().get(1).getValue();
+ if (opLeft.getExpressionTag() != LogicalExpressionTag.VARIABLE
+ || opRight.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+ return false;
+ }
+ LogicalVariable var1 = ((VariableReferenceExpression) opLeft).getVariableReference();
+ if (inLeftAll.contains(var1) && !outLeftFields.contains(var1)) {
+ outLeftFields.add(var1);
+ } else if (inRightAll.contains(var1) && !outRightFields.contains(var1)) {
+ outRightFields.add(var1);
+ } else {
+ return false;
+ }
+ LogicalVariable var2 = ((VariableReferenceExpression) opRight).getVariableReference();
+ if (inLeftAll.contains(var2) && !outLeftFields.contains(var2)) {
+ outLeftFields.add(var2);
+ } else if (inRightAll.contains(var2) && !outRightFields.contains(var2)) {
+ outRightFields.add(var2);
+ } else {
+ return false;
+ }
+ return true;
+ }
+ }
+ default: {
+ return false;
+ }
+ }
+ }
+
+ public static BroadcastSide getBroadcastJoinSide(ILogicalExpression e, List<LogicalVariable> varsLeft,
+ List<LogicalVariable> varsRight) {
+ if (e.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+ return null;
+ }
+ AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) e;
+ IExpressionAnnotation ann = fexp.getAnnotations().get(BroadcastExpressionAnnotation.BROADCAST_ANNOTATION_KEY);
+ if (ann == null) {
+ return null;
+ }
+ BroadcastSide side = (BroadcastSide) ann.getObject();
+ if (side == null) {
+ return null;
+ }
+ int i;
+ switch (side) {
+ case LEFT:
+ i = 0;
+ break;
+ case RIGHT:
+ i = 1;
+ break;
+ default:
+ return null;
+ }
+ ArrayList<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+ fexp.getArguments().get(i).getValue().getUsedVariables(vars);
+ if (varsLeft.containsAll(vars)) {
+ return BroadcastSide.LEFT;
+ } else if (varsRight.containsAll(vars)) {
+ return BroadcastSide.RIGHT;
+ } else {
+ return null;
+ }
+ }
+}
diff --git a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java
index d7ebfb4..9f0ff4f 100644
--- a/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java
+++ b/hyracks-algebricks/hyracks-algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/util/PhysicalOptimizationsUtil.java
@@ -8,6 +8,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.INestedPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
@@ -32,7 +33,7 @@
computeFDsAndEqClassesWithVisitorRec((AbstractLogicalOperator) i.getValue(), ctx, visitor, visitSet);
}
if (op.hasNestedPlans()) {
- for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+ for (ILogicalPlan p : ((INestedPlan) op).getNestedPlans()) {
for (Mutable<ILogicalOperator> r : p.getRoots()) {
AbstractLogicalOperator rootOp = (AbstractLogicalOperator) r.getValue();
computeFDsAndEqClassesWithVisitorRec(rootOp, ctx, visitor, visitSet);
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index 4d5d3d6..6ca4b1b 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -47,7 +47,6 @@
public void init(ArrayTupleBuilder tb, IFrameTupleAccessor accessor, int tIndex, AggregateState state)
throws HyracksDataException {
DataOutput output = tb.getDataOutput();
- ftr.reset(accessor, tIndex);
for (int i = 0; i < aggs.length; i++) {
try {
int begin = tb.getSize();
@@ -62,17 +61,19 @@
}
}
- // doing initial aggregate
- ftr.reset(accessor, tIndex);
- for (int i = 0; i < aggs.length; i++) {
- try {
- byte[] data = tb.getByteArray();
- int prevFieldPos = i + keys.length - 1;
- int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
- aggs[i].step(ftr, data, start, stateFieldLength[i]);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
+ // doing initial aggregate only is accessor is not null
+ if(accessor != null) {
+ ftr.reset(accessor, tIndex);
+ for (int i = 0; i < aggs.length; i++) {
+ try {
+ byte[] data = tb.getByteArray();
+ int prevFieldPos = i + keys.length - 1;
+ int start = prevFieldPos >= 0 ? tb.getFieldEndOffsets()[prevFieldPos] : 0;
+ aggs[i].step(ftr, data, start, stateFieldLength[i]);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ }
}
}
@@ -144,4 +145,4 @@
};
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GroupJoinHelper.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GroupJoinHelper.java
new file mode 100644
index 0000000..7858316
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GroupJoinHelper.java
@@ -0,0 +1,396 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameConstants;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.std.group.AggregateState;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.structures.ISerializableTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
+import edu.uci.ics.hyracks.dataflow.std.structures.TuplePointer;
+
+public class GroupJoinHelper{
+
+ private final IHyracksTaskContext ctx;
+
+ private final ArrayList<ByteBuffer> buffers;
+ private final ArrayList<ByteBuffer> stateBuffers;
+ /**
+ * Aggregate states: a list of states for all groups maintained in the main
+ * memory.
+ */
+ private int lastBIndex;
+ private int lastStateBIndex;
+ private final int[] storedKeys;
+ private final int[] keys;
+ private final int[] decors;
+ private final int[] keysAndDecors;
+ private final IBinaryComparator[] comparators;
+ private final ITuplePartitionComputer tpc;
+ private final IAggregatorDescriptor aggregator;
+ private final FrameTupleAppender appender, stateAppender;
+ private final FrameTupleAccessor stateAccessor, storedKeysAccessor;
+ private final ArrayTupleBuilder groupTupleBuilder, stateTupleBuilder, outputTupleBuilder, nullTupleBuilder;
+ private final ITuplePartitionComputer tpc1;
+ protected final FrameTuplePairComparator ftpc1;
+ private final int tableSize;
+ private final ISerializableTable hashTable;
+ private final boolean isLeftOuter;
+ private final INullWriter[] nullWriters;
+ private AggregateState state;
+
+ public GroupJoinHelper(IHyracksTaskContext ctx, int[] gFields, int[] jFields, int[] dFields,
+ IBinaryComparatorFactory[] comparatorFactories, ITuplePartitionComputerFactory gByTpc0,
+ ITuplePartitionComputerFactory gByTpc1, IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, boolean isLeftOuter,
+ INullWriter[] nullWriters1, int tableSize) throws HyracksDataException {
+
+ this.ctx = ctx;
+
+ buffers = new ArrayList<ByteBuffer>();
+ stateBuffers = new ArrayList<ByteBuffer>();
+
+ this.keys = gFields;
+ this.storedKeys = new int[keys.length];
+ this.decors = dFields;
+ this.keysAndDecors = new int[keys.length + decors.length];
+
+ @SuppressWarnings("rawtypes")
+ ISerializerDeserializer[] storedKeySerDeser = new ISerializerDeserializer[keys.length + decors.length + 2];
+ for (int i = 0; i < keys.length; ++i) {
+ storedKeys[i] = i;
+ storedKeySerDeser[i] = inRecordDescriptor.getFields()[keys[i]];
+ keysAndDecors[i] = i;
+ }
+ for (int i = 0; i < decors.length; ++i) {
+ keysAndDecors[keys.length + i] = keys.length + i;
+ storedKeySerDeser[keys.length + i] = inRecordDescriptor.getFields()[decors[i]];
+ }
+ storedKeySerDeser[keys.length + decors.length] = IntegerSerializerDeserializer.INSTANCE;
+ storedKeySerDeser[keys.length + decors.length + 1] = IntegerSerializerDeserializer.INSTANCE;
+
+ RecordDescriptor storedKeysRecordDescriptor = new RecordDescriptor(storedKeySerDeser);
+ storedKeysAccessor = new FrameTupleAccessor(ctx.getFrameSize(), storedKeysRecordDescriptor);
+
+ comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ tpc = gByTpc0.createPartitioner();
+
+ this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keys, storedKeys);
+
+ stateAccessor = new FrameTupleAccessor(ctx.getFrameSize(), outRecordDescriptor);
+
+ lastBIndex = -1;
+ lastStateBIndex = -1;
+
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ stateAppender = new FrameTupleAppender(ctx.getFrameSize());
+
+ addNewBuffer(false);
+ addNewBuffer(true);
+
+ groupTupleBuilder = new ArrayTupleBuilder(keysAndDecors.length + 2);
+ stateTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length - decors.length);
+ outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length);
+
+ tpc1 = gByTpc1.createPartitioner();
+ ftpc1 = new FrameTuplePairComparator(storedKeys, jFields, comparators);
+ this.tableSize = tableSize;
+
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriters = nullWriters1;
+ if (isLeftOuter) {
+ nullTupleBuilder = new ArrayTupleBuilder(nullWriters1.length);
+ DataOutput out = nullTupleBuilder.getDataOutput();
+ for (int i = 0; i < nullWriters1.length; i++) {
+ nullWriters1[i].writeNull(out);
+ nullTupleBuilder.addFieldEndOffset();
+ }
+ } else {
+ nullTupleBuilder = null;
+ }
+
+
+ hashTable = new SerializableHashTable(tableSize, ctx);
+ }
+
+ private void addNewBuffer(boolean isStateBuffer) {
+ ByteBuffer buffer = ctx.allocateFrame();
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ if (!isStateBuffer) {
+ buffers.add(buffer);
+ appender.reset(buffer, true);
+ ++lastBIndex;
+ } else {
+ stateBuffers.add(buffer);
+ stateAppender.reset(buffer, true);
+ ++lastStateBIndex;
+ }
+ }
+
+ private void updateAggregateIndex(FrameTupleAccessor accessor0, TuplePointer tp1, TuplePointer tp2) {
+
+ int tStart = accessor0.getTupleStartOffset(tp1.tupleIndex);
+ int fStartOffset = accessor0.getFieldSlotsLength() + tStart;
+
+ int fieldCount = accessor0.getFieldCount();
+ int fStart = accessor0.getFieldStartOffset(tp1.tupleIndex, fieldCount - 2);
+
+ accessor0.getBuffer().putInt(fStart + fStartOffset, tp2.frameIndex);
+ fStart = accessor0.getFieldStartOffset(tp1.tupleIndex, fieldCount - 1);
+ accessor0.getBuffer().putInt(fStart + fStartOffset, tp2.tupleIndex);
+ }
+
+ private TuplePointer getAggregateIndex(FrameTupleAccessor accessor0, int tIndex) {
+
+ TuplePointer tp = new TuplePointer();
+
+ int tStart = accessor0.getTupleStartOffset(tIndex);
+ int fStartOffset = accessor0.getFieldSlotsLength() + tStart;
+ int fieldCount = accessor0.getFieldCount();
+ int fStart = accessor0.getFieldStartOffset(tIndex, fieldCount - 2);
+
+ tp.frameIndex = accessor0.getBuffer().getInt(fStart + fStartOffset);
+ fStart = accessor0.getFieldStartOffset(tIndex, fieldCount - 1);
+ tp.tupleIndex = accessor0.getBuffer().getInt(fStart + fStartOffset);
+ return tp;
+ }
+
+ public void getField(FrameTupleAccessor accessor, int tIndex, int fIndex) {
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int fStartOffset = accessor.getFieldStartOffset(tIndex, fIndex);
+ int fLen = accessor.getFieldEndOffset(tIndex, fIndex) - fStartOffset;
+ System.out.print(" |");
+ for (int i=startOffset + accessor.getFieldSlotsLength() + fStartOffset; i < startOffset + accessor.getFieldSlotsLength() + fStartOffset + fLen; i++)
+ System.out.print(" " + accessor.getBuffer().get(i));
+ }
+
+
+ public void build(FrameTupleAccessor accessor, ByteBuffer buffer) throws HyracksDataException, IOException {
+ accessor.reset(buffer);
+ int tCount = accessor.getTupleCount();
+ int entry;
+ TuplePointer storedTuplePointer = new TuplePointer();
+
+ for (int tIndex = 0; tIndex < tCount; ++tIndex) {
+ entry = tpc.partition(accessor, tIndex, tableSize);
+
+ groupTupleBuilder.reset();
+// System.out.print("\nBTuple : " + entry);
+ for (int k = 0; k < keys.length; k++) {
+// getField(accessor, tIndex, keys[k]);
+ groupTupleBuilder.addField(accessor, tIndex, keys[k]);
+ }
+ for (int d = 0; d < decors.length; d++) {
+// getField(accessor, tIndex, decors[d]);
+ groupTupleBuilder.addField(accessor, tIndex, decors[d]);
+ }
+ groupTupleBuilder.getDataOutput().writeInt(-1);
+ groupTupleBuilder.addFieldEndOffset();
+ groupTupleBuilder.getDataOutput().writeInt(-1);
+ groupTupleBuilder.addFieldEndOffset();
+
+ if (!appender.appendSkipEmptyField(groupTupleBuilder.getFieldEndOffsets(),
+ groupTupleBuilder.getByteArray(), 0, groupTupleBuilder.getSize())) {
+ addNewBuffer(false);
+ if (!appender.appendSkipEmptyField(groupTupleBuilder.getFieldEndOffsets(),
+ groupTupleBuilder.getByteArray(), 0, groupTupleBuilder.getSize())) {
+ throw new HyracksDataException("Cannot init the aggregate state in a single frame.");
+ }
+ }
+
+ storedTuplePointer.frameIndex = lastBIndex;
+ storedTuplePointer.tupleIndex = appender.getTupleCount() - 1;
+ hashTable.insert(entry, storedTuplePointer);
+ }
+ }
+
+ public void insert(FrameTupleAccessor accessor, ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount0 = accessor.getTupleCount();
+ int entry, offset;
+ boolean foundGroup = false;
+ TuplePointer storedTuplePointer = new TuplePointer();
+ TuplePointer stateTuplePointer = new TuplePointer();
+ state = aggregator.createAggregateStates();
+
+ for (int tIndex = 0; tIndex < tupleCount0; ++tIndex) {
+ entry = tpc1.partition(accessor, tIndex, tableSize);
+
+ foundGroup = false;
+ offset = 0;
+ do {
+ hashTable.getTuplePointer(entry, offset++, storedTuplePointer);
+ if (storedTuplePointer.frameIndex < 0)
+ break;
+ storedKeysAccessor.reset(buffers.get(storedTuplePointer.frameIndex));
+
+ int c = ftpc1.compare(storedKeysAccessor, storedTuplePointer.tupleIndex, accessor, tIndex);
+ if (c == 0) {
+ foundGroup = true;
+ break;
+ }
+ } while (true);
+
+ if (foundGroup) {
+// System.out.print("\nITuple : " + entry);
+// getField(accessor, tIndex, storedKeys[0]);
+ stateTuplePointer = getAggregateIndex(storedKeysAccessor, storedTuplePointer.tupleIndex);
+
+ if(stateTuplePointer.frameIndex < 0) {
+ outputTupleBuilder.reset();
+
+ for (int k = 0; k < keys.length; k++) {
+ outputTupleBuilder.addField(storedKeysAccessor, storedTuplePointer.tupleIndex, storedKeys[k]);
+ }
+
+ aggregator.init(outputTupleBuilder, accessor, tIndex, state);
+
+ if (!stateAppender.append(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ addNewBuffer(true);
+ if (!stateAppender.append(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ throw new HyracksDataException("Cannot write the state into a frame.");
+ }
+ }
+
+ stateTuplePointer.frameIndex = lastStateBIndex;
+ stateTuplePointer.tupleIndex = stateAppender.getTupleCount() - 1;
+ updateAggregateIndex(storedKeysAccessor, storedTuplePointer, stateTuplePointer);
+ }
+ else {
+ stateAccessor.reset(stateBuffers.get(stateTuplePointer.frameIndex));
+ aggregator.aggregate(accessor, tIndex, stateAccessor, stateTuplePointer.tupleIndex, state);
+// stateAccessor.prettyPrint();
+ }
+ }
+ }
+ }
+
+ public void write(IFrameWriter writer) throws HyracksDataException {
+ ByteBuffer buffer = ctx.allocateFrame();
+ ByteBuffer tmpBuffer = ctx.allocateFrame();
+ appender.reset(buffer, true);
+ TuplePointer stateTuplePointer = new TuplePointer();
+ int currentStateBuffer = -1;
+ boolean emitTuple;
+
+
+ for (int i = 0; i < buffers.size(); ++i) {
+ storedKeysAccessor.reset(buffers.get(i));
+
+ for (int tIndex = 0; tIndex < storedKeysAccessor.getTupleCount(); ++tIndex) {
+ stateTuplePointer = getAggregateIndex(storedKeysAccessor, tIndex);
+ outputTupleBuilder.reset();
+ emitTuple = false;
+
+ if(stateTuplePointer.frameIndex > -1) {
+ emitTuple = true;
+ for (int j = 0; j < keysAndDecors.length; j++) {
+ outputTupleBuilder.addField(storedKeysAccessor, tIndex, keysAndDecors[j]);
+ }
+ if (currentStateBuffer != stateTuplePointer.frameIndex) {
+ stateAccessor.reset(stateBuffers.get(stateTuplePointer.frameIndex));
+ currentStateBuffer = stateTuplePointer.frameIndex;
+ }
+ aggregator.outputFinalResult(outputTupleBuilder, stateAccessor, stateTuplePointer.tupleIndex, state);
+ }
+ else if(isLeftOuter) {
+ emitTuple=true;
+/* for (int k = 0; k < nullWriters.length; k++) {
+ nullWriters[k].writeNull(outputTupleBuilder.getDataOutput());
+ outputTupleBuilder.addFieldEndOffset();
+ }
+*/
+// tmpBuffer.clear();
+ state = aggregator.createAggregateStates();
+ stateAppender.reset(tmpBuffer, true);
+
+ outputTupleBuilder.reset();
+
+ for (int k = 0; k < storedKeys.length; k++) {
+ outputTupleBuilder.addField(storedKeysAccessor, tIndex, storedKeys[k]);
+ }
+
+ aggregator.init(outputTupleBuilder, null, 0, state);
+
+ if (!stateAppender.append(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize()))
+ throw new HyracksDataException("Cannot write the state into a frame.");
+
+ outputTupleBuilder.reset();
+
+ for (int j = 0; j < keysAndDecors.length; j++) {
+ outputTupleBuilder.addField(storedKeysAccessor, tIndex, keysAndDecors[j]);
+ }
+
+ stateAccessor.reset(tmpBuffer);
+ currentStateBuffer = -1;
+
+ aggregator.outputFinalResult(outputTupleBuilder, stateAccessor, 0, state);
+ }
+
+ if (emitTuple) {
+ if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ writer.nextFrame(buffer);
+ appender.reset(buffer, true);
+ if (!appender.appendSkipEmptyField(outputTupleBuilder.getFieldEndOffsets(),
+ outputTupleBuilder.getByteArray(), 0, outputTupleBuilder.getSize())) {
+ throw new HyracksDataException("Cannot write groupjoin output into a frame.");
+ }
+ }
+ }
+ }
+ }
+
+ if (appender.getTupleCount() != 0) {
+ writer.nextFrame(buffer);
+ }
+ }
+
+ public void close() throws HyracksDataException {
+ buffers.clear();
+ stateBuffers.clear();
+ state = null;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashGroupJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashGroupJoinOperatorDescriptor.java
new file mode 100644
index 0000000..60abc12
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashGroupJoinOperatorDescriptor.java
@@ -0,0 +1,551 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
+import edu.uci.ics.hyracks.dataflow.common.io.RunFileWriter;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class HybridHashGroupJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
+ private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
+
+ private final int memsize;
+ private static final long serialVersionUID = 1L;
+ private final int inputsize0;
+ private final double factor;
+ private final int recordsPerFrame;
+ private final int[] keys0;
+ private final int[] keys1;
+ private final int[] projectFields;
+ private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+ private final IBinaryComparatorFactory[] joinComparatorFactories;
+ private final IBinaryComparatorFactory[] groupComparatorFactories;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
+
+ /**
+ * @param spec
+ * @param memsize
+ * in frames
+ * @param inputsize0
+ * in frames
+ * @param recordsPerFrame
+ * @param factor
+ * @param keys0
+ * @param keys1
+ * @param projectFields TODO
+ * @param hashFunctionFactories
+ * @param joinComparatorFactories
+ * @param groupComparatorFactories
+ * @param aggregatorFactory
+ * @param recordDescriptor
+ * @param isLeftOuter TODO
+ * @throws HyracksDataException
+ */
+
+ public HybridHashGroupJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
+ double factor, int[] keys0, int[] keys1, int[] projectFields,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] joinComparatorFactories,
+ IBinaryComparatorFactory[] groupComparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+ super(spec, 2, 1);
+ this.memsize = memsize;
+ this.inputsize0 = inputsize0;
+ this.factor = factor;
+ this.recordsPerFrame = recordsPerFrame;
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.projectFields = projectFields;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.joinComparatorFactories = joinComparatorFactories;
+ this.groupComparatorFactories = groupComparatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
+ recordDescriptors[0] = recordDescriptor;
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
+ ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
+ BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(p1Aid, p2Aid);
+ PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(p1Aid, p2Aid);
+
+ builder.addActivity(this, phase1);
+ builder.addSourceEdge(0, phase1, 0);
+
+ builder.addActivity(this, phase2);
+ builder.addSourceEdge(1, phase2, 0);
+
+ builder.addBlockingEdge(phase1, phase2);
+
+ builder.addTargetEdge(0, phase2, 0);
+ }
+
+ public static class BuildAndPartitionTaskState extends AbstractStateObject {
+ private RunFileWriter[] fWriters;
+ private InMemoryHashGroupJoin joiner;
+ private int nPartitions;
+ private int memoryForHashtable;
+
+ public BuildAndPartitionTaskState() {
+ }
+
+ private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+
+ }
+
+ private class BuildAndPartitionActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityId p2Aid;
+
+ public BuildAndPartitionActivityNode(ActivityId p1Aid, ActivityId p2Aid) {
+ super(p1Aid);
+ this.p2Aid = p2Aid;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(p2Aid, 0);
+ final IBinaryComparator[] comparators = new IBinaryComparator[groupComparatorFactories.length];
+ for (int i = 0; i < groupComparatorFactories.length; ++i) {
+ comparators[i] = groupComparatorFactories[i].createBinaryComparator();
+ }
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if(isLeftOuter){
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
+
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
+ .getJobId(), new TaskId(getActivityId(), partition));
+ private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(ctx.getFrameSize(), rd0);
+ private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys0,
+ hashFunctionFactories).createPartitioner();
+ private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ private final FrameTupleAppender ftappender = new FrameTupleAppender(ctx.getFrameSize());
+ private ByteBuffer[] bufferForPartitions;
+ private final ByteBuffer inBuffer = ctx.allocateFrame();
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (state.memoryForHashtable != 0)
+ build(inBuffer);
+
+ for (int i = 0; i < state.nPartitions; i++) {
+ ByteBuffer buf = bufferForPartitions[i];
+ accessorBuild.reset(buf);
+ if (accessorBuild.getTupleCount() > 0) {
+ write(i, buf);
+ }
+ closeWriter(i);
+ }
+
+ ctx.setStateObject(state);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
+ if (state.memoryForHashtable != memsize - 2) {
+ accessorBuild.reset(buffer);
+ int tCount = accessorBuild.getTupleCount();
+ for (int i = 0; i < tCount; ++i) {
+ int entry = -1;
+ if (state.memoryForHashtable == 0) {
+ entry = hpcBuild.partition(accessorBuild, i, state.nPartitions);
+ boolean newBuffer = false;
+ ByteBuffer bufBi = bufferForPartitions[entry];
+ while (true) {
+ appender.reset(bufBi, newBuffer);
+ if (appender.append(accessorBuild, i)) {
+ break;
+ } else {
+ write(entry, bufBi);
+ bufBi.clear();
+ newBuffer = true;
+ }
+ }
+ } else {
+ entry = hpcBuild.partition(accessorBuild, i, (int) (inputsize0 * factor / nPartitions));
+ if (entry < state.memoryForHashtable) {
+ while (true) {
+ if (!ftappender.append(accessorBuild, i)) {
+ build(inBuffer);
+
+ ftappender.reset(inBuffer, true);
+ } else {
+ break;
+ }
+ }
+ } else {
+ entry %= state.nPartitions;
+ boolean newBuffer = false;
+ ByteBuffer bufBi = bufferForPartitions[entry];
+ while (true) {
+ appender.reset(bufBi, newBuffer);
+ if (appender.append(accessorBuild, i)) {
+ break;
+ } else {
+ write(entry, bufBi);
+ bufBi.clear();
+ newBuffer = true;
+ }
+ }
+ }
+ }
+
+ }
+ } else {
+ build(buffer);
+ }
+
+ }
+
+ private void build(ByteBuffer inBuffer) throws HyracksDataException {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(inBuffer, copyBuffer);
+ try{
+ state.joiner.build(copyBuffer);
+ } catch (IOException e){
+
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (memsize > 1) {
+ if (memsize > inputsize0) {
+ state.nPartitions = 0;
+ } else {
+ state.nPartitions = (int) (Math.ceil((double) (inputsize0 * factor / nPartitions - memsize)
+ / (double) (memsize - 1)));
+ }
+ if (state.nPartitions <= 0) {
+ // becomes in-memory HJ
+ state.memoryForHashtable = memsize - 2;
+ state.nPartitions = 0;
+ } else {
+ state.memoryForHashtable = memsize - state.nPartitions - 2;
+ if (state.memoryForHashtable < 0) {
+ state.memoryForHashtable = 0;
+ state.nPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
+ }
+ }
+ } else {
+ throw new HyracksDataException("not enough memory");
+ }
+
+ ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories);
+ ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories);
+ int tableSize = (int) (state.memoryForHashtable * recordsPerFrame * factor);
+ state.joiner = new InMemoryHashGroupJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+ new FrameTupleAccessor(ctx.getFrameSize(), rd1), groupComparatorFactories, hpcf0, hpcf1, rd0, recordDescriptors[0],
+ aggregatorFactory, keys1, keys0, projectFields, isLeftOuter, nullWriters1);
+ bufferForPartitions = new ByteBuffer[state.nPartitions];
+ state.fWriters = new RunFileWriter[state.nPartitions];
+ for (int i = 0; i < state.nPartitions; i++) {
+ bufferForPartitions[i] = ctx.allocateFrame();
+ }
+
+ ftappender.reset(inBuffer, true);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+
+ private void closeWriter(int i) throws HyracksDataException {
+ RunFileWriter writer = state.fWriters[i];
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ private void write(int i, ByteBuffer head) throws HyracksDataException {
+ RunFileWriter writer = state.fWriters[i];
+ if (writer == null) {
+ FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
+ BuildAndPartitionActivityNode.class.getSimpleName());
+ writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ state.fWriters[i] = writer;
+ }
+ writer.nextFrame(head);
+ }
+ };
+ return op;
+ }
+ }
+
+ private class PartitionAndJoinActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityId p1Aid;
+
+ public PartitionAndJoinActivityNode(ActivityId p1Aid, ActivityId p2Aid) {
+ super(p2Aid);
+ this.p1Aid = p1Aid;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(p1Aid, 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ final IBinaryComparator[] comparators = new IBinaryComparator[groupComparatorFactories.length];
+ for (int i = 0; i < groupComparatorFactories.length; ++i) {
+ comparators[i] = groupComparatorFactories[i].createBinaryComparator();
+ }
+ final INullWriter[] nullWriters1 = new INullWriter[nullWriterFactories1.length];
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+
+ IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private BuildAndPartitionTaskState state;
+ private final FrameTupleAccessor accessorProbe = new FrameTupleAccessor(ctx.getFrameSize(), rd1);
+ private final ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0,
+ hashFunctionFactories);
+ private final ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1,
+ hashFunctionFactories);
+ private final ITuplePartitionComputer hpcProbe = hpcf1.createPartitioner();
+
+ private final FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+ private final FrameTupleAppender ftap = new FrameTupleAppender(ctx.getFrameSize());
+ private final ByteBuffer inBuffer = ctx.allocateFrame();
+ private final ByteBuffer outBuffer = ctx.allocateFrame();
+ private RunFileWriter[] buildWriters;
+ private RunFileWriter[] probeWriters;
+ private ByteBuffer[] bufferForPartitions;
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
+ BUILD_AND_PARTITION_ACTIVITY_ID), partition));
+ writer.open();
+ buildWriters = state.fWriters;
+ probeWriters = new RunFileWriter[state.nPartitions];
+ bufferForPartitions = new ByteBuffer[state.nPartitions];
+ for (int i = 0; i < state.nPartitions; i++) {
+ bufferForPartitions[i] = ctx.allocateFrame();
+ }
+ appender.reset(outBuffer, true);
+ ftap.reset(inBuffer, true);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ if (state.memoryForHashtable != memsize - 2) {
+ accessorProbe.reset(buffer);
+ int tupleCount0 = accessorProbe.getTupleCount();
+ for (int i = 0; i < tupleCount0; ++i) {
+
+ int entry = -1;
+ if (state.memoryForHashtable == 0) {
+ entry = hpcProbe.partition(accessorProbe, i, state.nPartitions);
+ boolean newBuffer = false;
+ ByteBuffer outbuf = bufferForPartitions[entry];
+ while (true) {
+ appender.reset(outbuf, newBuffer);
+ if (appender.append(accessorProbe, i)) {
+ break;
+ } else {
+ write(entry, outbuf);
+ outbuf.clear();
+ newBuffer = true;
+ }
+ }
+ } else {
+ entry = hpcProbe.partition(accessorProbe, i, (int) (inputsize0 * factor / nPartitions));
+ if (entry < state.memoryForHashtable) {
+ while (true) {
+ if (!ftap.append(accessorProbe, i)) {
+ state.joiner.join(inBuffer, writer);
+ ftap.reset(inBuffer, true);
+ } else
+ break;
+ }
+
+ } else {
+ entry %= state.nPartitions;
+ boolean newBuffer = false;
+ ByteBuffer outbuf = bufferForPartitions[entry];
+ while (true) {
+ appender.reset(outbuf, newBuffer);
+ if (appender.append(accessorProbe, i)) {
+ break;
+ } else {
+ write(entry, outbuf);
+ outbuf.clear();
+ newBuffer = true;
+ }
+ }
+ }
+ }
+ }
+ } else {
+ state.joiner.join(buffer, writer);
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ state.joiner.join(inBuffer, writer);
+ state.joiner.write(writer);
+ state.joiner.closeJoin(writer);
+
+ try{
+ if (state.memoryForHashtable != memsize - 2) {
+ for (int i = 0; i < state.nPartitions; i++) {
+ ByteBuffer buf = bufferForPartitions[i];
+ accessorProbe.reset(buf);
+ if (accessorProbe.getTupleCount() > 0) {
+ write(i, buf);
+ }
+ closeWriter(i);
+ }
+
+ inBuffer.clear();
+ int tableSize = -1;
+ if (state.memoryForHashtable == 0) {
+ tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
+ } else {
+ tableSize = (int) (memsize * recordsPerFrame * factor);
+ }
+ for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if (buildWriter == null) {
+ continue;
+ }
+ InMemoryHashGroupJoin joiner = new InMemoryHashGroupJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+ new FrameTupleAccessor(ctx.getFrameSize(), rd1), groupComparatorFactories, hpcf0, hpcf1, rd0, recordDescriptors[0],
+ aggregatorFactory, keys1, keys0, projectFields, isLeftOuter, nullWriters1);
+
+ RunFileReader buildReader = buildWriter.createReader();
+ buildReader.open();
+ while (buildReader.nextFrame(inBuffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(inBuffer, copyBuffer);
+ joiner.build(copyBuffer);
+ inBuffer.clear();
+ }
+ buildReader.close();
+
+ // probe
+ if (probeWriter != null) {
+ RunFileReader probeReader = probeWriter.createReader();
+ probeReader.open();
+ while (probeReader.nextFrame(inBuffer)) {
+ joiner.join(inBuffer, writer);
+ inBuffer.clear();
+ }
+ probeReader.close();
+ }
+ joiner.write(writer);
+ joiner.closeJoin(writer);
+ }
+ }
+ }
+ catch(Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ }
+ finally {
+ writer.close();
+ }
+ }
+
+ private void closeWriter(int i) throws HyracksDataException {
+ RunFileWriter writer = probeWriters[i];
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ private void write(int i, ByteBuffer head) throws HyracksDataException {
+ RunFileWriter writer = probeWriters[i];
+ if (writer == null) {
+ FileReference file = ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class
+ .getSimpleName());
+ writer = new RunFileWriter(file, ctx.getIOManager());
+ writer.open();
+ probeWriters[i] = writer;
+ }
+ writer.nextFrame(head);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+ };
+ return op;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoin.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoin.java
new file mode 100644
index 0000000..ff64600
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoin.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class InMemoryHashGroupJoin {
+ private final List<ByteBuffer> buffers;
+ private final FrameTupleAccessor accessorBuild;
+ private final FrameTupleAccessor accessorProbe;
+ private final FrameTupleAppender appender;
+ private final IBinaryComparatorFactory[] comparator;
+ private final ByteBuffer outBuffer;
+ private final GroupJoinHelper gByTable;
+
+ public InMemoryHashGroupJoin(IHyracksTaskContext ctx, int tableSize, FrameTupleAccessor accessor0,
+ FrameTupleAccessor accessor1, IBinaryComparatorFactory[] groupComparator, ITuplePartitionComputerFactory gByTpc0,
+ ITuplePartitionComputerFactory gByTpc1, RecordDescriptor gByInRecordDescriptor, RecordDescriptor gByOutRecordDescriptor,
+ IAggregatorDescriptorFactory aggregatorFactory, int[] joinAttributes, int[] groupAttributes,
+ int[] decorAttributes, boolean isLeftOuter, INullWriter[] nullWriters1) throws HyracksDataException {
+ buffers = new ArrayList<ByteBuffer>();
+ this.accessorBuild = accessor0;
+ this.accessorProbe = accessor1;
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ this.comparator = groupComparator;
+
+ outBuffer = ctx.allocateFrame();
+ appender.reset(outBuffer, true);
+
+ gByTable = new GroupJoinHelper(ctx, groupAttributes, joinAttributes, decorAttributes, comparator, gByTpc0,
+ gByTpc1, aggregatorFactory, gByInRecordDescriptor, gByOutRecordDescriptor, isLeftOuter, nullWriters1, tableSize);
+ }
+
+ public void build(ByteBuffer buffer) throws HyracksDataException, IOException {
+ buffers.add(buffer);
+ gByTable.build(accessorBuild, buffer);
+ }
+
+ public void join(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+ gByTable.insert(accessorProbe, buffer);
+ }
+
+ public void write(IFrameWriter writer) throws HyracksDataException {
+ gByTable.write(writer);
+ }
+ public void closeJoin(IFrameWriter writer) throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ flushFrame(outBuffer, writer);
+ }
+ gByTable.close();
+ }
+
+ private void flushFrame(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ writer.nextFrame(buffer);
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoinOperatorDescriptor.java
new file mode 100644
index 0000000..d03af17
--- /dev/null
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashGroupJoinOperatorDescriptor.java
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.std.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractStateObject;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
+
+public class InMemoryHashGroupJoinOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final int BUILD_ACTIVITY_ID = 0;
+ private static final int PROBE_ACTIVITY_ID = 1;
+ private static final int OUTPUT_ACTIVITY_ID = 2;
+
+ private static final long serialVersionUID = 1L;
+ private final int[] keys0;
+ private final int[] keys1;
+ private final int[] projectFields;
+ private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+ private final IBinaryComparatorFactory[] joinComparatorFactories;
+ private final IBinaryComparatorFactory[] groupComparatorFactories;
+ private final IAggregatorDescriptorFactory aggregatorFactory;
+ private final boolean isLeftOuter;
+ private final INullWriterFactory[] nullWriterFactories1;
+ private final int tableSize;
+
+ public InMemoryHashGroupJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
+ int[] projectFields, IBinaryHashFunctionFactory[] hashFunctionFactories,
+ IBinaryComparatorFactory[] joinComparatorFactories, IBinaryComparatorFactory[] groupComparatorFactories,
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
+ INullWriterFactory[] nullWriterFactories1, int tableSize) {
+ super(spec, 2, 1);
+ this.keys0 = keys0;
+ this.keys1 = keys1;
+ this.projectFields = projectFields;
+ this.hashFunctionFactories = hashFunctionFactories;
+ this.joinComparatorFactories = joinComparatorFactories;
+ this.groupComparatorFactories = groupComparatorFactories;
+ this.aggregatorFactory = aggregatorFactory;
+ recordDescriptors[0] = recordDescriptor;
+ this.isLeftOuter = isLeftOuter;
+ this.nullWriterFactories1 = nullWriterFactories1;
+ this.tableSize = tableSize;
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ ActivityId hbaId = new ActivityId(odId, BUILD_ACTIVITY_ID);
+ ActivityId hpaId = new ActivityId(odId, PROBE_ACTIVITY_ID);
+ ActivityId oaId = new ActivityId(odId, OUTPUT_ACTIVITY_ID);
+ HashBuildActivityNode hba = new HashBuildActivityNode(hbaId, hpaId);
+ HashProbeActivityNode hpa = new HashProbeActivityNode(hbaId, hpaId);
+ OutputActivity oa = new OutputActivity(oaId);
+
+ builder.addActivity(this, hba);
+ builder.addSourceEdge(0, hba, 0);
+
+ builder.addActivity(this, hpa);
+ builder.addSourceEdge(1, hpa, 0);
+
+ builder.addActivity(this, oa);
+ builder.addTargetEdge(0, oa, 0);
+
+ builder.addBlockingEdge(hba, hpa);
+ builder.addBlockingEdge(hpa, oa);
+
+ }
+
+ public static class HashBuildTaskState extends AbstractStateObject {
+ private InMemoryHashGroupJoin joiner;
+
+ public HashBuildTaskState() {
+ }
+
+ private HashBuildTaskState(JobId jobId, TaskId taskId) {
+ super(jobId, taskId);
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+
+ }
+ }
+
+ private class HashBuildActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityId hpaId;
+
+ public HashBuildActivityNode(ActivityId hbaId, ActivityId hpaId) {
+ super(hbaId);
+ this.hpaId = hpaId;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(hpaId, 0);
+
+ final IBinaryComparator[] comparators = new IBinaryComparator[joinComparatorFactories.length];
+ for (int i = 0; i < joinComparatorFactories.length; ++i) {
+ comparators[i] = joinComparatorFactories[i].createBinaryComparator();
+ }
+
+ final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
+ if(isLeftOuter){
+ for (int i = 0; i < nullWriterFactories1.length; i++) {
+ nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
+ }
+ }
+ IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+ private HashBuildTaskState state;
+
+ @Override
+ public void open() throws HyracksDataException {
+ ITuplePartitionComputerFactory hpcf0 = new FieldHashPartitionComputerFactory(keys0, hashFunctionFactories);
+ ITuplePartitionComputerFactory hpcf1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories);
+ state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
+ partition));
+
+ state.joiner = new InMemoryHashGroupJoin(ctx, tableSize, new FrameTupleAccessor(ctx.getFrameSize(), rd0),
+ new FrameTupleAccessor(ctx.getFrameSize(), rd1), groupComparatorFactories, hpcf0 /*gByTpc0*/, hpcf1 /*gByTpc1*/, rd0,
+ recordDescriptors[0], aggregatorFactory, keys1, keys0, projectFields, isLeftOuter, nullWriters1);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer copyBuffer = ctx.allocateFrame();
+ FrameUtils.copy(buffer, copyBuffer);
+ try{
+ state.joiner.build(copyBuffer);
+ } catch (IOException e){
+
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ctx.setStateObject(state);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+ };
+ return op;
+ }
+
+ }
+
+ private class HashProbeActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ private final ActivityId hbaId;
+
+ public HashProbeActivityNode(ActivityId hbaId, ActivityId hpaId) {
+ super(hpaId);
+ this.hbaId = hbaId;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private HashBuildTaskState state;
+
+ @Override
+ public void open() throws HyracksDataException {
+ state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
+ BUILD_ACTIVITY_ID), partition));
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ // Probe activity joiner
+ state.joiner.join(buffer, writer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ctx.setStateObject(state);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ throw new HyracksDataException("InMemoryHashGroupJoinOperator has failed.");
+ }
+
+ };
+ return op;
+ }
+
+ }
+
+ private class OutputActivity extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public OutputActivity(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @Override
+ public void initialize() throws HyracksDataException {
+ HashBuildTaskState buildState = (HashBuildTaskState) ctx.getStateObject(new TaskId(
+ new ActivityId(getOperatorId(), BUILD_ACTIVITY_ID), partition));
+ InMemoryHashGroupJoin table = buildState.joiner;
+ writer.open();
+ try {
+ table.write(writer);
+ } catch (Exception e) {
+ writer.fail();
+ throw new HyracksDataException(e);
+ } finally {
+ table.closeJoin(writer);
+ writer.close();
+ }
+
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderGroupJoinTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderGroupJoinTest.java
new file mode 100644
index 0000000..9e0a265
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderGroupJoinTest.java
@@ -0,0 +1,447 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashSpillableTableFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.CountFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.HybridHashGroupJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashGroupJoinOperatorDescriptor;
+
+public class TPCHCustomerOrderGroupJoinTest extends AbstractIntegrationTest {
+ private static final Logger LOGGER = Logger.getLogger(TPCHCustomerOrderGroupJoinTest.class.getName());
+
+ private static class NoopNullWriterFactory implements INullWriterFactory {
+
+ private static final long serialVersionUID = 1L;
+ public static final NoopNullWriterFactory INSTANCE = new NoopNullWriterFactory();
+
+ private NoopNullWriterFactory() {
+ }
+
+ @Override
+ public INullWriter createNullWriter() {
+ return new INullWriter() {
+ @Override
+ public void writeNull(DataOutput out) throws HyracksDataException {
+ try {
+ out.writeInt(0);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+ }
+
+ /*
+ * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL,
+ * C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY
+ * INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT
+ * NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL );
+ * TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL,
+ * O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE
+ * DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY
+ * CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT
+ * NULL, O_COMMENT VARCHAR(79) NOT NULL );
+ */
+
+ @Test
+ public void customerOrderInMemoryGroupJoin() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("*customerOrderInMemoryGroupJoin start");
+ }
+ long runTime = System.currentTimeMillis();
+ JobSpecification spec = new JobSpecification();
+
+ String custTable = "data/tpch0.001/customer.tbl";
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ custTable))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ IntegerParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ String ordTable = "data/tpch0.001/orders.tbl";
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ ordTable))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ RecordDescriptor custOrderGroupJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[1];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ InMemoryHashGroupJoinOperatorDescriptor groupJoin = new InMemoryHashGroupJoinOperatorDescriptor(
+ spec,
+ new int[] { 0 },
+ new int[] { 1 },
+ new int[] { },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+ custOrderGroupJoinDesc,
+ true,
+ nullWriterFactories,
+ 128);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, groupJoin, NC1_ID);
+
+ ExternalGroupOperatorDescriptor finalGroup = new ExternalGroupOperatorDescriptor(
+ spec,
+ new int[] { 1 },
+ 4,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(IntegerPointable.FACTORY) },
+ new IntegerNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false) }),
+ custOrderGroupJoinDesc,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(IntegerPointable.FACTORY) }),
+ 128), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, finalGroup, NC1_ID);
+
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, "|");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, groupJoin, 0);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, groupJoin, 1);
+
+ IConnectorDescriptor finalGroupConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(finalGroupConn, groupJoin, 0, finalGroup, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, finalGroup, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+
+ runTime = System.currentTimeMillis() - runTime;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("*customerOrderInMemoryGroupJoin stop. run time: " + runTime);
+ }
+ }
+
+ @Test
+ public void customerOrderGraceHashGroupJoin() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("*customerOrderGraceHashGroupJoin start");
+ }
+ long runTime = System.currentTimeMillis();
+ JobSpecification spec = new JobSpecification();
+
+ String custTable = "data/tpch0.001/customer.tbl";
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ custTable))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ IntegerParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ String ordTable = "data/tpch0.001/orders.tbl";
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ ordTable))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ RecordDescriptor custOrderGroupJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[1];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ HybridHashGroupJoinOperatorDescriptor groupJoin = new HybridHashGroupJoinOperatorDescriptor(
+// GraceHashGroupJoinOperatorDescriptor groupJoin = new GraceHashGroupJoinOperatorDescriptor(
+ spec,
+ 5,
+ 128,
+ 200,
+ 1.2,
+ new int[] { 0 },
+ new int[] { 1 },
+ new int[]{ },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+ custOrderGroupJoinDesc, true, nullWriterFactories);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, groupJoin, NC1_ID);
+
+ ExternalGroupOperatorDescriptor finalGroup = new ExternalGroupOperatorDescriptor(
+ spec,
+ new int[] { 1 },
+ 4,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(IntegerPointable.FACTORY) },
+ new IntegerNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false) }),
+ custOrderGroupJoinDesc,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(IntegerPointable.FACTORY) }),
+ 128), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, finalGroup, NC1_ID);
+
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, "|");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, groupJoin, 0);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, groupJoin, 1);
+
+ IConnectorDescriptor finalGroupConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(finalGroupConn, groupJoin, 0, finalGroup, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, finalGroup, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+
+ runTime = System.currentTimeMillis() - runTime;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("*customerOrderGraceHashGroupJoin stop. run time: " + runTime);
+ }
+ }
+
+ @Test
+ public void customerOrderHybridHashGroupJoin() throws Exception {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("*customerOrderHybridHashGroupJoin start");
+ }
+ long runTime = System.currentTimeMillis();
+ JobSpecification spec = new JobSpecification();
+
+ String custTable = "data/tpch0.001/customer.tbl";
+ FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+ custTable))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ IntegerParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID);
+
+ String ordTable = "data/tpch0.001/orders.tbl";
+ FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new FileReference(new File(
+ ordTable))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID);
+
+ RecordDescriptor custOrderGroupJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[1];
+ for (int j = 0; j < nullWriterFactories.length; j++) {
+ nullWriterFactories[j] = NoopNullWriterFactory.INSTANCE;
+ }
+
+ HybridHashGroupJoinOperatorDescriptor groupJoin = new HybridHashGroupJoinOperatorDescriptor(
+ spec,
+ 5,
+ 128,
+ 200,
+ 1.2,
+ new int[] { 0 },
+ new int[] { 1 },
+ new int[]{ },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+ custOrderGroupJoinDesc, true, nullWriterFactories);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, groupJoin, NC1_ID);
+
+ ExternalGroupOperatorDescriptor finalGroup = new ExternalGroupOperatorDescriptor(
+ spec,
+ new int[] { 1 },
+ 4,
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(IntegerPointable.FACTORY) },
+ new IntegerNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new CountFieldAggregatorFactory(false) }),
+ new MultiFieldsAggregatorFactory( new IFieldAggregateDescriptorFactory[] { new IntSumFieldAggregatorFactory(1, false) }),
+ custOrderGroupJoinDesc,
+ new HashSpillableTableFactory(
+ new FieldHashPartitionComputerFactory(
+ new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(IntegerPointable.FACTORY) }),
+ 128), true);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, finalGroup, NC1_ID);
+
+ IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+ createTempFile().getAbsolutePath()) });
+ IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, "|");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
+
+ IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(custJoinConn, custScanner, 0, groupJoin, 0);
+
+ IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(ordJoinConn, ordScanner, 0, groupJoin, 1);
+
+ IConnectorDescriptor finalGroupConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(finalGroupConn, groupJoin, 0, finalGroup, 0);
+
+ IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, finalGroup, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+
+ runTime = System.currentTimeMillis() - runTime;
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("*customerOrderHybridHashGroupJoin stop. run time: " + runTime);
+ }
+ }
+}
\ No newline at end of file