Merge branch 'master' into zheilbron/hyracks_msr_demo
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalExpression.java
index 84d7fab..c4ae9bc 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalExpression.java
@@ -67,4 +67,6 @@
public boolean splitIntoConjuncts(List<Mutable<ILogicalExpression>> conjs);
public abstract ILogicalExpression cloneExpression();
+
+ public boolean isFunctional();
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index c11dd80..d91cac0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -32,6 +33,10 @@
public interface ILogicalOperator {
+ public LogicalOperatorTag getOperatorTag();
+
+ public ExecutionMode getExecutionMode();
+
public List<Mutable<ILogicalOperator>> getInputs();
boolean hasInputs();
@@ -89,7 +94,7 @@
public IPhysicalPropertiesVector getDeliveredPhysicalProperties();
public void computeDeliveredPhysicalProperties(IOptimizationContext context) throws AlgebricksException;
-
+
/**
* Indicates whether the expressions used by this operator must be variable reference expressions.
*/
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
index 0c19c79..b8a9c87 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
@@ -323,4 +323,18 @@
}
}
+ @Override
+ public boolean isFunctional() {
+ if (!finfo.isFunctional()) {
+ return false;
+ }
+
+ for (Mutable<ILogicalExpression> e : arguments) {
+ if (!e.getValue().isFunctional()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java
index abc46c0..2718076 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java
@@ -36,4 +36,9 @@
// do nothing
}
+ @Override
+ public boolean isFunctional() {
+ return true;
+ }
+
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
index 0482133..1a3fe47 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
@@ -170,4 +170,5 @@
public boolean splitIntoConjuncts(List<Mutable<ILogicalExpression>> conjs) {
return false;
}
+
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AbstractFunctionInfo.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AbstractFunctionInfo.java
new file mode 100644
index 0000000..fe47def
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AbstractFunctionInfo.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.functions;
+
+import java.io.Serializable;
+
+public abstract class AbstractFunctionInfo implements IFunctionInfo, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean isFunctional;
+
+ protected AbstractFunctionInfo(boolean isFunctional) {
+ this.isFunctional = isFunctional;
+ }
+
+ @Override
+ public boolean isFunctional() {
+ return isFunctional;
+ }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
index 4c294a7..02cd6de 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
@@ -14,7 +14,8 @@
*/
package edu.uci.ics.hyracks.algebricks.core.algebra.functions;
-
public interface IFunctionInfo {
FunctionIdentifier getFunctionIdentifier();
+
+ public boolean isFunctional();
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index a7e8479..78efd0d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -69,6 +69,7 @@
// outputs = new ArrayList<LogicalOperatorReference>();
}
+ @Override
public abstract LogicalOperatorTag getOperatorTag();
public ExecutionMode getExecutionMode() {
@@ -182,7 +183,7 @@
return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getNullableTypeComputer(),
ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
}
-
+
@Override
public boolean requiresVariableReferenceExpressions() {
return true;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java
index 710e02f..5872e6d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java
@@ -29,12 +29,6 @@
public class EmptyTupleSourceOperator extends AbstractLogicalOperator {
- // public final static EmptyTupleSourceOperator INSTANCE = new
- // EmptyTupleSourceOperator();
-
- public EmptyTupleSourceOperator() {
- }
-
@Override
public LogicalOperatorTag getOperatorTag() {
return LogicalOperatorTag.EMPTYTUPLESOURCE;
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletFunction.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletFunction.java
index 47148ac..ba5c5aa 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletFunction.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletFunction.java
@@ -14,13 +14,16 @@
*/
package edu.uci.ics.hyracks.algebricks.examples.piglet.metadata;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AbstractFunctionInfo;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-public class PigletFunction implements IFunctionInfo {
+public class PigletFunction extends AbstractFunctionInfo {
+ private static final long serialVersionUID = 1L;
+
private final FunctionIdentifier fid;
public PigletFunction(FunctionIdentifier fid) {
+ super(true);
this.fid = fid;
}
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
index 767f458..1c2cfae 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
@@ -32,7 +32,7 @@
import edu.uci.ics.hyracks.algebricks.rewriter.rules.InlineVariablesRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushLimitDownRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.CopyLimitDownRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectDownRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectIntoDataSourceScanRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectDownRule;
@@ -106,13 +106,13 @@
physicalPlanRewrites.add(new SetAlgebricksPhysicalOperatorsRule());
physicalPlanRewrites.add(new EnforceStructuralPropertiesRule());
physicalPlanRewrites.add(new PushProjectDownRule());
- physicalPlanRewrites.add(new PushLimitDownRule());
+ physicalPlanRewrites.add(new CopyLimitDownRule());
return physicalPlanRewrites;
}
public final static List<IAlgebraicRewriteRule> buildPhysicalRewritesTopLevelRuleCollection() {
List<IAlgebraicRewriteRule> physicalPlanRewrites = new LinkedList<IAlgebraicRewriteRule>();
- physicalPlanRewrites.add(new PushLimitDownRule());
+ physicalPlanRewrites.add(new CopyLimitDownRule());
return physicalPlanRewrites;
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
new file mode 100644
index 0000000..2f080fb
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class CopyLimitDownRule implements IAlgebraicRewriteRule {
+
+ @Override
+ public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+ return false;
+ }
+
+ @Override
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+ if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
+ return false;
+ }
+ LimitOperator limitOp = (LimitOperator) op;
+ if (!limitOp.isTopmostLimitOp()) {
+ return false;
+ }
+
+ List<LogicalVariable> limitUsedVars = new ArrayList<>();
+ VariableUtilities.getUsedVariables(limitOp, limitUsedVars);
+
+ Mutable<ILogicalOperator> safeOpRef = null;
+ Mutable<ILogicalOperator> candidateOpRef = limitOp.getInputs().get(0);
+
+ List<LogicalVariable> candidateProducedVars = new ArrayList<>();
+ while (true) {
+ candidateProducedVars.clear();
+ ILogicalOperator candidateOp = candidateOpRef.getValue();
+ LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag();
+ if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap()
+ || candidateOpTag == LogicalOperatorTag.SELECT || candidateOpTag == LogicalOperatorTag.LIMIT
+ || !OperatorPropertiesUtil.disjoint(limitUsedVars, candidateProducedVars)) {
+ break;
+ }
+
+ safeOpRef = candidateOpRef;
+ candidateOpRef = safeOpRef.getValue().getInputs().get(0);
+ }
+
+ if (safeOpRef != null) {
+ ILogicalOperator safeOp = safeOpRef.getValue();
+ Mutable<ILogicalOperator> unsafeOpRef = safeOp.getInputs().get(0);
+ ILogicalOperator unsafeOp = unsafeOpRef.getValue();
+ LimitOperator limitCloneOp = null;
+ if (limitOp.getOffset().getValue() == null) {
+ limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false);
+ } else {
+ IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction(
+ AlgebricksBuiltinFunctions.NUMERIC_ADD);
+ List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>();
+ addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
+ addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
+ ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
+ limitCloneOp = new LimitOperator(maxPlusOffset, false);
+ }
+ limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
+ limitCloneOp.getInputs().add(new MutableObject<ILogicalOperator>(unsafeOp));
+ limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
+ limitCloneOp.recomputeSchema();
+ unsafeOpRef.setValue(limitCloneOp);
+ context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
+ context.addToDontApplySet(this, limitOp);
+ }
+
+ return safeOpRef != null;
+ }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index a684bc9..5263213 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -124,6 +124,11 @@
/** copy the original order-by operator and insert on-top-of the subplan operator */
context.addToDontApplySet(this, child);
OrderOperator sourceOrderOp = (OrderOperator) child;
+ for (Pair<IOrder, Mutable<ILogicalExpression>> expr : sourceOrderOp.getOrderExpressions()) {
+ if (!expr.second.getValue().isFunctional()) {
+ return false;
+ }
+ }
List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = deepCopyOrderAndExpression(sourceOrderOp
.getOrderExpressions());
OrderOperator newOrderOp = new OrderOperator(orderExprs);
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
index 9e0d8d0..eafb3b0 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -251,7 +251,7 @@
return true;
}
} else {
- if (assignCommonExpression(exprEqClass, expr)) {
+ if (expr.isFunctional() && assignCommonExpression(exprEqClass, expr)) {
//re-obtain the live vars after rewriting in the method called in the if condition
Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
VariableUtilities.getLiveVariables(op, liveVars);
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 40b049b..aa6916d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -45,25 +45,20 @@
* (some variables are generated by datasources).
* Inlining variables may enable other optimizations by allowing selects and assigns to be moved
* (e.g., a select may be pushed into a join to enable an efficient physical join operator).
- *
* Preconditions/Assumptions:
- * Assumes no projects are in the plan. Only inlines variables whose assigned expression is a function call
- * (i.e., this rule ignores right-hand side constants and other variable references expressions
- *
+ * Assumes no projects are in the plan. Only inlines variables whose assigned expression is a function call
+ * (i.e., this rule ignores right-hand side constants and other variable references expressions
* Postconditions/Examples:
* All qualifying variables have been inlined.
- *
* Example (simplified):
- *
* Before plan:
* select <- [$$1 < $$2 + $$0]
- * assign [$$2] <- [funcZ() + $$0]
- * assign [$$0, $$1] <- [funcX(), funcY()]
- *
+ * assign [$$2] <- [funcZ() + $$0]
+ * assign [$$0, $$1] <- [funcX(), funcY()]
* After plan:
* select <- [funcY() < funcZ() + funcX() + funcX()]
- * assign [$$2] <- [funcZ() + funcX()]
- * assign [$$0, $$1] <- [funcX(), funcY()]
+ * assign [$$2] <- [funcZ() + funcX()]
+ * assign [$$0, $$1] <- [funcX(), funcY()]
*/
public class InlineVariablesRule implements IAlgebraicRewriteRule {
@@ -73,12 +68,12 @@
// Visitor for replacing variable reference expressions with their originating expression.
protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
-
+
// Set of FunctionIdentifiers that we should not inline.
protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<FunctionIdentifier>();
-
+
protected boolean hasRun = false;
-
+
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
return false;
@@ -100,12 +95,12 @@
hasRun = true;
return modified;
}
-
+
protected void prepare(IOptimizationContext context) {
varAssignRhs.clear();
inlineVisitor.setContext(context);
}
-
+
protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
// Only inline variables in operators that can deal with arbitrary expressions.
if (!op.requiresVariableReferenceExpressions()) {
@@ -118,22 +113,22 @@
protected boolean performFinalAction() throws AlgebricksException {
return false;
}
-
+
protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-
+
// Update mapping from variables to expressions during top-down traversal.
if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
AssignOperator assignOp = (AssignOperator) op;
List<LogicalVariable> vars = assignOp.getVariables();
- List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();
+ List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();
for (int i = 0; i < vars.size(); i++) {
ILogicalExpression expr = exprs.get(i).getValue();
- // Ignore functions that are in the doNotInline set.
+ // Ignore functions that are either in the doNotInline set or are non-functional
if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
- if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier())) {
+ if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier()) || !funcExpr.isFunctional()) {
continue;
}
}
@@ -146,13 +141,13 @@
for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
if (inlineVariables(inputOpRef, context)) {
modified = true;
- }
+ }
}
if (performBottomUpAction(op)) {
modified = true;
}
-
+
if (modified) {
context.computeAndSetTypeEnvironmentForOperator(op);
context.addToDontApplySet(this, op);
@@ -164,23 +159,23 @@
}
protected class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
-
+
private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
- private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();
+ private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();
private ILogicalOperator op;
private IOptimizationContext context;
// If set, only replace this variable reference.
private LogicalVariable targetVar;
-
+
public InlineVariablesVisitor(Map<LogicalVariable, ILogicalExpression> varAssignRhs) {
this.varAssignRhs = varAssignRhs;
}
-
+
public void setTargetVariable(LogicalVariable targetVar) {
this.targetVar = targetVar;
}
-
+
public void setContext(IOptimizationContext context) {
this.context = context;
}
@@ -189,9 +184,9 @@
this.op = op;
liveVars.clear();
}
-
+
@Override
- public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+ public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
ILogicalExpression e = exprRef.getValue();
switch (((AbstractLogicalExpression) e).getExpressionTag()) {
case VARIABLE: {
@@ -200,16 +195,18 @@
if (targetVar != null && var != targetVar) {
return false;
}
+
// Make sure has not been excluded from inlining.
if (context.shouldNotBeInlined(var)) {
return false;
}
+
ILogicalExpression rhs = varAssignRhs.get(var);
if (rhs == null) {
// Variable was not produced by an assign.
return false;
}
-
+
// Make sure used variables from rhs are live.
if (liveVars.isEmpty()) {
VariableUtilities.getLiveVariables(op, liveVars);
@@ -221,7 +218,7 @@
return false;
}
}
-
+
// Replace variable reference with a clone of the rhs expr.
exprRef.setValue(rhs.cloneExpression());
return true;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
index f03f4e3..418ee36 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
@@ -36,31 +36,28 @@
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
/**
- * Pushes an AssignOperator below a UnionAll operator by creating an new AssignOperator below each of
+ * Pushes an AssignOperator below a UnionAll operator by creating an new AssignOperator below each of
* the UnionAllOperator's branches with appropriate variable replacements.
- * This rule can help to enable other rules that are difficult to fire across a UnionAllOperator,
+ * This rule can help to enable other rules that are difficult to fire across a UnionAllOperator,
* for example, eliminating common sub-expressions.
- *
* Example:
- *
* Before plan:
* ...
* assign [$$20, $$21] <- [funcA($$3), funcB($$6)]
- * union ($$1, $$2, $$3) ($$4, $$5, $$6)
- * union_branch_0
- * ...
- * union_branch_1
- * ...
- *
+ * union ($$1, $$2, $$3) ($$4, $$5, $$6)
+ * union_branch_0
+ * ...
+ * union_branch_1
+ * ...
* After plan:
* ...
* union ($$1, $$2, $$3) ($$4, $$5, $$6) ($$22, $$24, $$20) ($$23, $$25, $$21)
- * assign [$$22, $$23] <- [funcA($$1), funcB($$4)]
- * union_branch_0
- * ...
- * assign [$$24, $$25] <- [funcA($$2), funcB($$5)]
- * union_branch_1
- * ...
+ * assign [$$22, $$23] <- [funcA($$1), funcB($$4)]
+ * union_branch_0
+ * ...
+ * assign [$$24, $$25] <- [funcA($$2), funcB($$5)]
+ * union_branch_1
+ * ...
*/
public class PushAssignBelowUnionAllRule implements IAlgebraicRewriteRule {
@@ -84,6 +81,11 @@
continue;
}
AssignOperator assignOp = (AssignOperator) childOp;
+ for (Mutable<ILogicalExpression> expr : assignOp.getExpressions()) {
+ if (!expr.getValue().isFunctional()) {
+ return false;
+ }
+ }
AbstractLogicalOperator childOfChildOp = (AbstractLogicalOperator) assignOp.getInputs().get(0).getValue();
if (childOfChildOp.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
deleted file mode 100644
index a0bebaf..0000000
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import java.util.LinkedList;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class PushLimitDownRule implements IAlgebraicRewriteRule {
-
- @Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
- return false;
- }
-
- /**
- * When a global Limit over a merge-exchange is found, a local Limit is
- * pushed down.
- */
-
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
- AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
- if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
- return false;
- }
- LimitOperator opLim = (LimitOperator) op;
- if (!opLim.isTopmostLimitOp()) {
- return false;
- }
-
- Mutable<ILogicalOperator> opRef2 = opLim.getInputs().get(0);
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
-
- if (context.checkAndAddToAlreadyCompared(op, op2)) {
- return false;
- }
- if (op2.getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
- return false;
- }
- PhysicalOperatorTag op2PTag = op2.getPhysicalOperator().getOperatorTag();
- // we should test for any kind of merge
- if (op2PTag != PhysicalOperatorTag.RANDOM_MERGE_EXCHANGE && op2PTag != PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
- return false;
- }
-
- LinkedList<LogicalVariable> usedVars1 = new LinkedList<LogicalVariable>();
- VariableUtilities.getUsedVariables(opLim, usedVars1);
-
- do {
- if (op2.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE
- || op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE
- || op2.getOperatorTag() == LogicalOperatorTag.LIMIT) {
- return false;
- }
- if (op2.getInputs().size() > 1 || !op2.isMap()) {
- break;
- }
- LinkedList<LogicalVariable> vars2 = new LinkedList<LogicalVariable>();
- VariableUtilities.getProducedVariables(op2, vars2);
- if (!OperatorPropertiesUtil.disjoint(vars2, usedVars1)) {
- return false;
- }
- // we assume pipelineable ops. have only one input
- opRef2 = op2.getInputs().get(0);
- op2 = (AbstractLogicalOperator) opRef2.getValue();
- } while (true);
-
- LimitOperator clone2 = null;
- if (opLim.getOffset().getValue() == null) {
- clone2 = new LimitOperator(opLim.getMaxObjects().getValue(), false);
- } else {
- // push limit (max+offset)
- IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction(
- AlgebricksBuiltinFunctions.NUMERIC_ADD);
- ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd,
- opLim.getMaxObjects(), opLim.getOffset());
- clone2 = new LimitOperator(maxPlusOffset, false);
- }
- clone2.setPhysicalOperator(new StreamLimitPOperator());
- clone2.getInputs().add(new MutableObject<ILogicalOperator>(op2));
- clone2.setExecutionMode(op2.getExecutionMode());
- clone2.recomputeSchema();
- opRef2.setValue(clone2);
- context.computeAndSetTypeEnvironmentForOperator(clone2);
- return true;
- }
-
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 06ef1097..addfb2e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -33,118 +33,136 @@
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class AlgebricksMetaOperatorDescriptor extends
+ AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- // array of factories for building the local runtime pipeline
- private final AlgebricksPipeline pipeline;
+ // array of factories for building the local runtime pipeline
+ private final AlgebricksPipeline pipeline;
- public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
- IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
- super(spec, inputArity, outputArity);
- if (outputArity == 1) {
- this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
- }
- this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
- }
+ public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ int inputArity, int outputArity,
+ IPushRuntimeFactory[] runtimeFactories,
+ RecordDescriptor[] internalRecordDescriptors) {
+ super(spec, inputArity, outputArity);
+ if (outputArity == 1) {
+ this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
+ }
+ this.pipeline = new AlgebricksPipeline(runtimeFactories,
+ internalRecordDescriptors);
+ }
- public AlgebricksPipeline getPipeline() {
- return pipeline;
- }
+ public AlgebricksPipeline getPipeline() {
+ return pipeline;
+ }
- @Override
- public JSONObject toJSON() throws JSONException {
- JSONObject json = super.toJSON();
- json.put("micro-operators", pipeline.getRuntimeFactories());
- return json;
- }
+ @Override
+ public JSONObject toJSON() throws JSONException {
+ JSONObject json = super.toJSON();
+ json.put("micro-operators", pipeline.getRuntimeFactories());
+ return json;
+ }
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Asterix { \n");
- for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
- sb.append(" " + f.toString() + ";\n");
- }
- sb.append("}");
- // sb.append(super.getInputArity());
- // sb.append(";");
- // sb.append(super.getOutputArity());
- // sb.append(";");
- return sb.toString();
- }
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Asterix { \n");
+ for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+ sb.append(" " + f.toString() + ";\n");
+ }
+ sb.append("}");
+ // sb.append(super.getInputArity());
+ // sb.append(";");
+ // sb.append(super.getOutputArity());
+ // sb.append(";");
+ return sb.toString();
+ }
- @Override
- public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- if (inputArity == 0) {
- return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- } else {
- return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- }
- }
+ @Override
+ public IOperatorNodePushable createPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ if (inputArity == 0) {
+ return createSourceInputPushRuntime(ctx, recordDescProvider,
+ partition, nPartitions);
+ } else {
+ return createOneInputOneOutputPushRuntime(ctx, recordDescProvider,
+ partition, nPartitions);
+ }
+ }
- private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ private IOperatorNodePushable createSourceInputPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
- public void initialize() throws HyracksDataException {
- IFrameWriter startOfPipeline;
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
+ public void initialize() throws HyracksDataException {
+ IFrameWriter startOfPipeline;
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
- PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
- pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException e) {
- throw new HyracksDataException(e);
- }
- startOfPipeline.open();
- startOfPipeline.close();
- }
- };
- }
+ PipelineAssembler pa = new PipelineAssembler(pipeline,
+ inputArity, outputArity, null,
+ pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException e) {
+ throw new HyracksDataException(e);
+ }
+ startOfPipeline.open();
+ startOfPipeline.close();
+ }
+ };
+ }
- private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
- final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+ private IOperatorNodePushable createOneInputOneOutputPushRuntime(
+ final IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, int partition,
+ int nPartitions) {
+ return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
- private IFrameWriter startOfPipeline;
+ private IFrameWriter startOfPipeline;
- @Override
- public void open() throws HyracksDataException {
- if (startOfPipeline == null) {
- RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
- : null;
- RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
- AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
- PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
- pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
- try {
- startOfPipeline = pa.assemblePipeline(writer, ctx);
- } catch (AlgebricksException ae) {
- throw new HyracksDataException(ae);
- }
- }
- startOfPipeline.open();
- }
+ @Override
+ public void open() throws HyracksDataException {
+ if (startOfPipeline == null) {
+ RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+ : null;
+ RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider
+ .getInputRecordDescriptor(
+ AlgebricksMetaOperatorDescriptor.this
+ .getActivityId(), 0);
+ PipelineAssembler pa = new PipelineAssembler(pipeline,
+ inputArity, outputArity,
+ pipelineInputRecordDescriptor,
+ pipelineOutputRecordDescriptor);
+ try {
+ startOfPipeline = pa.assemblePipeline(writer, ctx);
+ } catch (AlgebricksException ae) {
+ throw new HyracksDataException(ae);
+ }
+ }
+ startOfPipeline.open();
+ }
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- startOfPipeline.nextFrame(buffer);
- }
+ @Override
+ public void nextFrame(ByteBuffer buffer)
+ throws HyracksDataException {
+ startOfPipeline.nextFrame(buffer);
+ }
- @Override
- public void close() throws HyracksDataException {
- startOfPipeline.close();
- }
+ @Override
+ public void close() throws HyracksDataException {
+ startOfPipeline.close();
+ }
- @Override
- public void fail() throws HyracksDataException {
- startOfPipeline.fail();
- }
- };
- }
+ @Override
+ public void fail() throws HyracksDataException {
+ startOfPipeline.fail();
+ }
+ };
+ }
}
diff --git a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/logical/expression/HiveFunctionInfo.java b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/logical/expression/HiveFunctionInfo.java
index 4f4ca90..95b63b7 100644
--- a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/logical/expression/HiveFunctionInfo.java
+++ b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/logical/expression/HiveFunctionInfo.java
@@ -12,39 +12,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hivesterix.logical.expression;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-
-public class HiveFunctionInfo implements IFunctionInfo, Serializable {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * primary function identifier
- */
- private transient FunctionIdentifier fid;
-
- /**
- * secondary function identifier: function name
- */
- private transient Object secondaryFid;
-
- public HiveFunctionInfo(FunctionIdentifier fid, Object secondFid) {
- this.fid = fid;
- this.secondaryFid = secondFid;
- }
-
- @Override
- public FunctionIdentifier getFunctionIdentifier() {
- return fid;
- }
-
- public Object getInfo() {
- return secondaryFid;
- }
-
-}
+package edu.uci.ics.hivesterix.logical.expression;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AbstractFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class HiveFunctionInfo extends AbstractFunctionInfo {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * primary function identifier
+ */
+ private transient FunctionIdentifier fid;
+
+ /**
+ * secondary function identifier: function name
+ */
+ private transient Object secondaryFid;
+
+ public HiveFunctionInfo(FunctionIdentifier fid, Object secondFid) {
+ super(true);
+ this.fid = fid;
+ this.secondaryFid = secondFid;
+ }
+
+ @Override
+ public FunctionIdentifier getFunctionIdentifier() {
+ return fid;
+ }
+
+ public Object getInfo() {
+ return secondaryFid;
+ }
+
+}
diff --git a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
index 12b5986..1f31e44 100644
--- a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
+++ b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
@@ -38,7 +38,7 @@
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByCombinerRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushLimitDownRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.CopyLimitDownRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectDownRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectIntoDataSourceScanRule;
import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectDownRule;
@@ -107,7 +107,7 @@
PHYSICAL_PLAN_REWRITES.add(new EnforceStructuralPropertiesRule());
PHYSICAL_PLAN_REWRITES.add(new PushProjectDownRule());
PHYSICAL_PLAN_REWRITES.add(new SetAlgebricksPhysicalOperatorsRule());
- PHYSICAL_PLAN_REWRITES.add(new PushLimitDownRule());
+ PHYSICAL_PLAN_REWRITES.add(new CopyLimitDownRule());
PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeWriteRule());
PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeUnionRule());
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index 7a795da..015e3a9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -58,4 +58,5 @@
* @return The Cluster Controller Context.
*/
public ICCContext getCCContext();
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
index 51db13e..7225969 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
@@ -3,9 +3,15 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
+<<<<<<< HEAD
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+=======
*
* http://www.apache.org/licenses/LICENSE-2.0
*
+>>>>>>> master
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
index 3d70a40..c80f6d1 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.api.application;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
import edu.uci.ics.hyracks.api.resources.memory.IMemoryManager;
/**
@@ -24,7 +25,14 @@
*/
public interface INCApplicationContext extends IApplicationContext {
/**
- * Gets the node Id of the Node Congtroller.
+ * Gets the life cycle component manager of the Node Controller.
+ *
+ * @return
+ */
+ public ILifeCycleComponentManager getLifeCycleComponentManager();
+
+ /**
+ * Gets the node Id of the Node Controller.
*
* @return the Node Id.
*/
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 55d80e2..9ff741e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -31,6 +31,7 @@
GET_CLUSTER_TOPOLOGY,
CREATE_JOB,
GET_JOB_STATUS,
+ GET_JOB_INFO,
START_JOB,
GET_DATASET_DIRECTORY_SERIVICE_INFO,
GET_DATASET_RESULT_STATUS,
@@ -76,6 +77,25 @@
}
}
+ public static class GetJobInfoFunction extends Function {
+ private static final long serialVersionUID = 1L;
+
+ private final JobId jobId;
+
+ public GetJobInfoFunction(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public FunctionId getFunctionId() {
+ return FunctionId.GET_JOB_INFO;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+ }
+
public static class StartJobFunction extends Function {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 2bac49f..98f27f2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -103,4 +104,11 @@
deploymentId);
rpci.call(ipcHandle, dbf);
}
+
+ @Override
+ public JobInfo getJobInfo(JobId jobId) throws Exception {
+ HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = new HyracksClientInterfaceFunctions.GetJobInfoFunction(
+ jobId);
+ return (JobInfo) rpci.call(ipcHandle, gjsf);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 98836cd..1916360 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -36,6 +36,7 @@
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -148,7 +149,7 @@
binaryURLs.add(new URL(url));
}
}
- /**deploy the URLs to the CC and NCs*/
+ /** deploy the URLs to the CC and NCs */
hci.deployBinary(binaryURLs, deploymentId);
return deploymentId;
}
@@ -176,4 +177,9 @@
EnumSet<JobFlag> jobFlags) throws Exception {
return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
}
+
+ @Override
+ public JobInfo getJobInfo(JobId jobId) throws Exception {
+ return hci.getJobInfo(jobId);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 2820cdf..1e44e91 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -44,6 +45,16 @@
public JobStatus getJobStatus(JobId jobId) throws Exception;
/**
+ * Gets detailed information about the specified Job.
+ *
+ * @param jobId
+ * JobId of the Job
+ * @return {@link JobStatus}
+ * @throws Exception
+ */
+ public JobInfo getJobInfo(JobId jobId) throws Exception;
+
+ /**
* Start the specified Job.
*
* @param appName
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 737152b..d0eeada 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -23,6 +23,7 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -46,4 +47,7 @@
public void unDeployBinary(DeploymentId deploymentId) throws Exception;
public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+
+ public JobInfo getJobInfo(JobId jobId) throws Exception;
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 79efaa0..48d7275 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -43,6 +43,11 @@
}
@Override
+ public JobSpecification getJobSpecification() {
+ return spec;
+ }
+
+ @Override
public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
final ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException {
final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 978348b..99a0712 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -23,4 +23,6 @@
public interface IActivityClusterGraphGeneratorFactory extends Serializable {
public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException;
+
+ public JobSpecification getJobSpecification();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
new file mode 100644
index 0000000..812a32a
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public class JobInfo implements Serializable{
+
+ private final JobId jobId;
+
+ private JobStatus status;
+
+ private List<Exception> exceptions;
+
+ private JobStatus pendingStatus;
+
+ private List<Exception> pendingExceptions;
+
+ private Map<OperatorDescriptorId, List<String>> operatorLocations;
+
+ public JobInfo(JobId jobId, JobStatus jobStatus, Map<OperatorDescriptorId, List<String>> operatorLocations) {
+ this.jobId = jobId;
+ this.operatorLocations = operatorLocations;
+ this.status = status;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(JobStatus status) {
+ this.status = status;
+ }
+
+ public List<Exception> getExceptions() {
+ return exceptions;
+ }
+
+ public void setExceptions(List<Exception> exceptions) {
+ this.exceptions = exceptions;
+ }
+
+ public JobStatus getPendingStatus() {
+ return pendingStatus;
+ }
+
+ public void setPendingStatus(JobStatus pendingStatus) {
+ this.pendingStatus = pendingStatus;
+ }
+
+ public List<Exception> getPendingExceptions() {
+ return pendingExceptions;
+ }
+
+ public void setPendingExceptions(List<Exception> pendingExceptions) {
+ this.pendingExceptions = pendingExceptions;
+ }
+
+ public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+ return operatorLocations;
+ }
+
+ public void setOperatorLocations(Map<OperatorDescriptorId, List<String>> operatorLocations) {
+ this.operatorLocations = operatorLocations;
+ }
+
+ public JobId getJobId() {
+ return jobId;
+ }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
index ec27653..b2a0aec 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
@@ -25,8 +25,6 @@
public class LifeCycleComponentManager implements ILifeCycleComponentManager {
- public final static LifeCycleComponentManager INSTANCE = new LifeCycleComponentManager();
-
public static final class Config {
public static final String DUMP_PATH_KEY = "DUMP_PATH";
}
@@ -38,7 +36,7 @@
private String dumpPath;
private boolean configured;
- private LifeCycleComponentManager() {
+ public LifeCycleComponentManager() {
components = new ArrayList<ILifeCycleComponent>();
stopInitiated = false;
configured = false;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 0463350..5ec9c19 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -42,6 +42,7 @@
import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.topology.ClusterTopology;
import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
@@ -55,6 +56,7 @@
import edu.uci.ics.hyracks.control.cc.work.CliUnDeployBinaryWork;
import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
+import edu.uci.ics.hyracks.control.cc.work.GetJobInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
@@ -350,6 +352,13 @@
return;
}
+ case GET_JOB_INFO: {
+ HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
+ workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
+ new IPCResponder<JobInfo>(handle, mid)));
+ return;
+ }
+
case START_JOB: {
HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = createJobId();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index bae0eb5..dab05f7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -16,6 +16,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,6 +30,7 @@
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -87,6 +89,8 @@
private List<Exception> pendingExceptions;
+ private Map<OperatorDescriptorId, List<String>> operatorLocations;
+
public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
this.deploymentId = deploymentId;
@@ -101,6 +105,7 @@
cleanupPendingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+ operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
}
public DeploymentId getDeploymentId() {
@@ -178,6 +183,15 @@
this.endTime = endTime;
}
+ public void registerOperatorLocation(OperatorDescriptorId op, String location) {
+ List<String> locations = operatorLocations.get(op);
+ if (locations == null) {
+ locations = new ArrayList<String>();
+ operatorLocations.put(op, locations);
+ }
+ locations.add(location);
+ }
+
@Override
public synchronized void waitForCompletion() throws Exception {
while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
@@ -348,8 +362,7 @@
taskAttempt.put("end-time", ta.getEndTime());
List<Exception> exceptions = ta.getExceptions();
if (exceptions != null && !exceptions.isEmpty()) {
- List<Exception> filteredExceptions = ExceptionUtils
- .getActualExceptions(exceptions);
+ List<Exception> filteredExceptions = ExceptionUtils.getActualExceptions(exceptions);
for (Exception exception : filteredExceptions) {
StringWriter exceptionWriter = new StringWriter();
exception.printStackTrace(new PrintWriter(exceptionWriter));
@@ -379,4 +392,8 @@
return result;
}
-}
\ No newline at end of file
+
+ public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+ return operatorLocations;
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index fd6360a..d2c018f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -92,6 +93,7 @@
public void startJob() throws HyracksException {
startRunnableActivityClusters();
+ ccs.getApplicationContext().notifyJobStart(jobRun.getJobId());
}
private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
@@ -326,6 +328,8 @@
tads = new ArrayList<TaskAttemptDescriptor>();
taskAttemptMap.put(nodeId, tads);
}
+ OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId();
+ jobRun.registerOperatorLocation(opId, nodeId);
ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
new file mode 100644
index 0000000..c2e08f2
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetJobInfoWork extends SynchronizableWork {
+ private final ClusterControllerService ccs;
+ private final JobId jobId;
+ private final IResultCallback<JobInfo> callback;
+
+ public GetJobInfoWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobInfo> callback) {
+ this.ccs = ccs;
+ this.jobId = jobId;
+ this.callback = callback;
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ JobRun run = ccs.getActiveRunMap().get(jobId);
+ if (run == null) {
+ run = ccs.getRunMapArchive().get(jobId);
+ }
+ JobInfo info = run == null ? null
+ : new JobInfo(run.getJobId(), run.getStatus(), run.getOperatorLocations());
+ callback.setValue(info);
+ } catch (Exception e) {
+ callback.setException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 43129f5..1d4daf7 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -123,4 +123,5 @@
}
}
-}
\ No newline at end of file
+}
+
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
index fce7180..8c88627 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
@@ -19,7 +19,6 @@
import org.kohsuke.args4j.CmdLineParser;
-import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
public class NCDriver {
@@ -39,9 +38,9 @@
final NodeControllerService nService = new NodeControllerService(ncConfig);
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.severe("Setting uncaught exception handler " + LifeCycleComponentManager.INSTANCE);
+ LOGGER.severe("Setting uncaught exception handler " + nService.getLifeCycleComponentManager());
}
- Thread.currentThread().setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
+ Thread.currentThread().setUncaughtExceptionHandler(nService.getLifeCycleComponentManager());
nService.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 6049a3b..da2d0de 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -50,6 +50,8 @@
import edu.uci.ics.hyracks.api.deployment.DeploymentId;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
import edu.uci.ics.hyracks.control.common.base.IClusterController;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -130,6 +132,8 @@
private INCApplicationEntryPoint ncAppEntryPoint;
+ private final ILifeCycleComponentManager lccm;
+
private final MemoryMXBean memoryMXBean;
private final List<GarbageCollectorMXBean> gcMXBeans;
@@ -158,6 +162,7 @@
partitionManager = new PartitionManager(this);
netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
+ lccm = new LifeCycleComponentManager();
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
timer = new Timer(true);
@@ -181,6 +186,10 @@
return appCtx;
}
+ public ILifeCycleComponentManager getLifeCycleComponentManager() {
+ return lccm;
+ }
+
private static List<IODeviceHandle> getDevices(String ioDevices) {
List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
StringTokenizer tok = new StringTokenizer(ioDevices, ",");
@@ -281,7 +290,7 @@
}
private void startApplication() throws Exception {
- appCtx = new NCApplicationContext(serverCtx, ctx, id, memoryManager);
+ appCtx = new NCApplicationContext(serverCtx, ctx, id, memoryManager, lccm);
String className = ncConfig.appNCMainClass;
if (className != null) {
Class<?> c = Class.forName(className);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
index 4b8eb53..4ef8d9a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -19,26 +19,34 @@
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
import edu.uci.ics.hyracks.api.resources.memory.IMemoryManager;
import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.nc.resources.memory.MemoryManager;
public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+ private final ILifeCycleComponentManager lccm;
private final String nodeId;
private final IHyracksRootContext rootCtx;
private final MemoryManager memoryManager;
private Object appObject;
public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String nodeId,
- MemoryManager memoryManager) throws IOException {
+ MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager) throws IOException {
super(serverCtx);
+ this.lccm = lifeCyclecomponentManager;
this.nodeId = nodeId;
this.rootCtx = rootCtx;
this.memoryManager = memoryManager;
}
@Override
+ public ILifeCycleComponentManager getLifeCycleComponentManager() {
+ return lccm;
+ }
+
+ @Override
public String getNodeId() {
return nodeId;
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 15db1fe..0e63485 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -139,7 +139,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("open(" + pid + " by " + taId);
}
- fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+ fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(":", "$"));
handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
size = 0;
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 5ac2961..d9b4984 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -101,8 +101,8 @@
protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
protected final static String sep = System.getProperty("file.separator");
protected final String dateString = simpleDateFormat.format(new Date());
- protected final String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
- protected final String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
+ protected final String primaryFileName = "primaryBtree" + dateString;
+ protected final String btreeFileName = "invIndexBtree" + dateString;
protected IFileSplitProvider primaryFileSplitProvider = new ConstantFileSplitProvider(
new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
@@ -194,7 +194,7 @@
private IOperatorDescriptor createFileScanOp(JobSpecification spec) {
FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
- "data/cleanednumbereddblptitles.txt"))) };
+ "data" + File.separator + "cleanednumbereddblptitles.txt"))) };
IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 39fa19b..7356644 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -80,7 +80,7 @@
ccConfig.clusterNetIpAddress = "127.0.0.1";
ccConfig.clusterNetPort = 39001;
ccConfig.profileDumpPeriod = 10000;
- File outDir = new File("target/ClusterController");
+ File outDir = new File("target" + File.separator + "ClusterController");
outDir.mkdirs();
File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
ccRoot.delete();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 22ff84e..ddaa7cf 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -77,7 +77,7 @@
ccConfig.clusterNetIpAddress = "127.0.0.1";
ccConfig.clusterNetPort = 39001;
ccConfig.profileDumpPeriod = 10000;
- File outDir = new File("target/ClusterController");
+ File outDir = new File("target" + File.separator + "ClusterController");
outDir.mkdirs();
File ccRoot = File.createTempFile(AbstractMultiNCIntegrationTest.class.getName(), ".data", outDir);
ccRoot.delete();
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index 3e4ecf8..f65c78a 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -163,12 +163,11 @@
throw new HyracksDataException("Failed to destroy the index since it is activated.");
}
- file.delete();
if (fileId == -1) {
return;
}
-
bufferCache.deleteFile(fileId, false);
+ file.delete();
fileId = -1;
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 443ad2b..8420c73 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -344,4 +344,4 @@
public ILSMOperationTracker getOperationTracker() {
return opTracker;
}
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index 9b77e23..d518122 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -20,10 +20,13 @@
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
import edu.uci.ics.hyracks.api.messages.IMessageBroker;
import edu.uci.ics.hyracks.api.resources.memory.IMemoryManager;
public class TestNCApplicationContext implements INCApplicationContext {
+ private final ILifeCycleComponentManager lccm;
private final IHyracksRootContext rootCtx;
private final String nodeId;
@@ -33,6 +36,7 @@
private final IMemoryManager mm;
public TestNCApplicationContext(IHyracksRootContext rootCtx, String nodeId) {
+ this.lccm = new LifeCycleComponentManager();
this.rootCtx = rootCtx;
this.nodeId = nodeId;
mm = new IMemoryManager() {
@@ -94,11 +98,11 @@
return null;
}
- @Override
- public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
- // TODO Auto-generated method stub
- return null;
- }
+ @Override
+ public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
+ // TODO Auto-generated method stub
+ return null;
+ }
@Override
public IMemoryManager getMemoryManager() {
@@ -116,4 +120,9 @@
// TODO Auto-generated method stub
}
+
+ @Override
+ public ILifeCycleComponentManager getLifeCycleComponentManager() {
+ return lccm;
+ }
}
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index 26cb8d0..f42dfe4 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -618,6 +618,10 @@
* Terminate the current partition where the current vertex stays in.
* This will immediately take effect and the upcoming vertice in the
* same partition cannot be processed.
+ * <<<<<<< HEAD
+ *
+ =======
+ * >>>>>>> master
*/
protected final void terminatePartition() {
voteToHalt();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
index 638011b..3fe13b1 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -173,4 +173,4 @@
}
}
-}
+}
\ No newline at end of file