Merged hyracks master back into VXQuery branch.
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablePolicy.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablePolicy.java
new file mode 100644
index 0000000..207204e
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablePolicy.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+
+/**
+ * Where assign operators only assign a new variable ID for a reference expression,
+ * all references are updated to the first variable ID.
+ */
+public class InlineVariablePolicy implements InlineVariablesRule.IInlineVariablePolicy {
+
+ @Override
+ public boolean addExpressionToInlineMap(ILogicalExpression expr, Set<FunctionIdentifier> doNotInlineFuncs) {
+ if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+ AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+ if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier())) {
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public List<Mutable<ILogicalOperator>> descendIntoNextOperator(AbstractLogicalOperator op) {
+ List<Mutable<ILogicalOperator>> descendOp = new ArrayList<Mutable<ILogicalOperator>>();
+ // Descend into children removing projects on the way.
+ for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+ descendOp.add(inputOpRef);
+ }
+ return descendOp;
+ }
+
+ @Override
+ public boolean transformOperator(AbstractLogicalOperator op) {
+ // Only inline variables in operators that can deal with arbitrary expressions.
+ if (!op.requiresVariableReferenceExpressions()) {
+ return true;
+ }
+ return false;
+ }
+
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 40b049b..ac8505e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -27,7 +27,6 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
@@ -73,12 +72,22 @@
// Visitor for replacing variable reference expressions with their originating expression.
protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
-
+
// Set of FunctionIdentifiers that we should not inline.
protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<FunctionIdentifier>();
-
+
protected boolean hasRun = false;
+ protected IInlineVariablePolicy policy;
+
+ public InlineVariablesRule() {
+ policy = new InlineVariablePolicy();
+ }
+
+ public InlineVariablesRule(IInlineVariablePolicy policy) {
+ this.policy = policy;
+ }
+
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
return false;
@@ -100,15 +109,14 @@
hasRun = true;
return modified;
}
-
+
protected void prepare(IOptimizationContext context) {
varAssignRhs.clear();
inlineVisitor.setContext(context);
}
-
+
protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
- // Only inline variables in operators that can deal with arbitrary expressions.
- if (!op.requiresVariableReferenceExpressions()) {
+ if (policy.transformOperator(op)) {
inlineVisitor.setOperator(op);
return op.acceptExpressionTransform(inlineVisitor);
}
@@ -118,41 +126,36 @@
protected boolean performFinalAction() throws AlgebricksException {
return false;
}
-
+
protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-
+
// Update mapping from variables to expressions during top-down traversal.
if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
AssignOperator assignOp = (AssignOperator) op;
List<LogicalVariable> vars = assignOp.getVariables();
- List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();
+ List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();
for (int i = 0; i < vars.size(); i++) {
ILogicalExpression expr = exprs.get(i).getValue();
- // Ignore functions that are in the doNotInline set.
- if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
- AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
- if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier())) {
- continue;
- }
+ if (policy.addExpressionToInlineMap(expr, doNotInlineFuncs)) {
+ varAssignRhs.put(vars.get(i), exprs.get(i).getValue());
}
- varAssignRhs.put(vars.get(i), exprs.get(i).getValue());
}
}
- // Descend into children removing projects on the way.
boolean modified = false;
- for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+ // Follow all operators from this operator.
+ for (Mutable<ILogicalOperator> inputOpRef : policy.descendIntoNextOperator(op)) {
if (inlineVariables(inputOpRef, context)) {
modified = true;
- }
+ }
}
if (performBottomUpAction(op)) {
modified = true;
}
-
+
if (modified) {
context.computeAndSetTypeEnvironmentForOperator(op);
context.addToDontApplySet(this, op);
@@ -164,23 +167,23 @@
}
protected class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
-
+
private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
- private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();
+ private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();
private ILogicalOperator op;
private IOptimizationContext context;
// If set, only replace this variable reference.
private LogicalVariable targetVar;
-
+
public InlineVariablesVisitor(Map<LogicalVariable, ILogicalExpression> varAssignRhs) {
this.varAssignRhs = varAssignRhs;
}
-
+
public void setTargetVariable(LogicalVariable targetVar) {
this.targetVar = targetVar;
}
-
+
public void setContext(IOptimizationContext context) {
this.context = context;
}
@@ -189,7 +192,7 @@
this.op = op;
liveVars.clear();
}
-
+
@Override
public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
ILogicalExpression e = exprRef.getValue();
@@ -209,7 +212,7 @@
// Variable was not produced by an assign.
return false;
}
-
+
// Make sure used variables from rhs are live.
if (liveVars.isEmpty()) {
VariableUtilities.getLiveVariables(op, liveVars);
@@ -221,7 +224,7 @@
return false;
}
}
-
+
// Replace variable reference with a clone of the rhs expr.
exprRef.setValue(rhs.cloneExpression());
return true;
@@ -242,4 +245,15 @@
}
}
}
+
+ public static interface IInlineVariablePolicy {
+
+ public boolean addExpressionToInlineMap(ILogicalExpression expr, Set<FunctionIdentifier> doNotInlineFuncs);
+
+ public List<Mutable<ILogicalOperator>> descendIntoNextOperator(AbstractLogicalOperator op);
+
+ public boolean transformOperator(AbstractLogicalOperator op);
+
+ }
+
}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 0463350..629ac5d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -242,6 +242,7 @@
@Override
public void stop() throws Exception {
LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
+ datasetDirectoryService.stop();
executor.shutdownNow();
webServer.stop();
sweeper.cancel();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ee1cc67..7b060c7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -41,21 +42,28 @@
* job.
*/
public class DatasetDirectoryService implements IDatasetDirectoryService {
+ private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
+
private final long resultTTL;
private final long resultSweepThreshold;
private final Map<JobId, IDatasetStateRecord> jobResultLocations;
+ private ResultStateSweeper resultStateSweeper;
+
public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
+ resultStateSweeper = null;
}
@Override
public void init(ExecutorService executor) {
- executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+ LOGGER.info("New ResultStateSweepser in DatasetDirectoryService.");
+ resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
+ executor.execute(resultStateSweeper);
}
@Override
@@ -260,4 +268,9 @@
}
return null;
}
+
+ @Override
+ public void stop() {
+ resultStateSweeper.close();
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 3de3c50..05ed177 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -42,4 +42,6 @@
public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
+
+ public void stop();
}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
index 69b560c..a5ad630 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -39,16 +39,19 @@
private final List<JobId> toBeCollected;
+ private boolean running;
+
public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
this.datasetManager = datasetManager;
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
toBeCollected = new ArrayList<JobId>();
+ running = true;
}
@Override
public void run() {
- while (true) {
+ while (running) {
try {
Thread.sleep(resultSweepThreshold);
sweep();
@@ -57,9 +60,14 @@
// There isn't much we can do really here
}
}
-
+ LOGGER.info("Result cleaner thread has stopped.");
}
+ public void close() {
+ running = false;
+ LOGGER.info("Result cleaner thread has been flagged to stop.");
+ }
+
private void sweep() {
synchronized (datasetManager) {
toBeCollected.clear();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 325e2bc..b28ffaa 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -50,6 +50,8 @@
private final DatasetMemoryManager datasetMemoryManager;
+ private final ResultStateSweeper resultStateSweeper;
+
public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
long resultSweepThreshold) {
this.ncs = ncs;
@@ -62,7 +64,8 @@
datasetMemoryManager = null;
}
partitionResultStateMap = new LinkedHashMap<JobId, IDatasetStateRecord>();
- executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+ resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
+ executor.execute(resultStateSweeper);
}
@Override
@@ -208,6 +211,7 @@
deinit(entry.getKey());
}
deallocatableRegistry.close();
+ resultStateSweeper.close();
}
@Override