Merged hyracks master back into VXQuery branch.
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablePolicy.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablePolicy.java
new file mode 100644
index 0000000..207204e
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablePolicy.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+
+/**
+ * Where assign operators only assign a new variable ID for a reference expression,
+ * all references are updated to the first variable ID.
+ */
+public class InlineVariablePolicy implements InlineVariablesRule.IInlineVariablePolicy {
+
+    @Override
+    public boolean addExpressionToInlineMap(ILogicalExpression expr, Set<FunctionIdentifier> doNotInlineFuncs) {
+        if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
+            if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier())) {
+                return false;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public List<Mutable<ILogicalOperator>> descendIntoNextOperator(AbstractLogicalOperator op) {
+        List<Mutable<ILogicalOperator>> descendOp = new ArrayList<Mutable<ILogicalOperator>>();
+        // Descend into children removing projects on the way.
+        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+            descendOp.add(inputOpRef);
+        }
+        return descendOp;
+    }
+
+    @Override
+    public boolean transformOperator(AbstractLogicalOperator op) {
+        // Only inline variables in operators that can deal with arbitrary expressions.
+        if (!op.requiresVariableReferenceExpressions()) {
+            return true;
+        }
+        return false;
+    }
+
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 40b049b..ac8505e 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
@@ -73,12 +72,22 @@
 
     // Visitor for replacing variable reference expressions with their originating expression.
     protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
-    
+
     // Set of FunctionIdentifiers that we should not inline.
     protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<FunctionIdentifier>();
-    
+
     protected boolean hasRun = false;
     
+    protected IInlineVariablePolicy policy;
+
+    public InlineVariablesRule() {
+        policy = new InlineVariablePolicy();
+    }
+
+    public InlineVariablesRule(IInlineVariablePolicy policy) {
+        this.policy = policy;
+    }
+    
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
         return false;
@@ -100,15 +109,14 @@
         hasRun = true;
         return modified;
     }
-    
+
     protected void prepare(IOptimizationContext context) {
         varAssignRhs.clear();
         inlineVisitor.setContext(context);
     }
-    
+
     protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
-        // Only inline variables in operators that can deal with arbitrary expressions.
-        if (!op.requiresVariableReferenceExpressions()) {
+        if (policy.transformOperator(op)) {
             inlineVisitor.setOperator(op);
             return op.acceptExpressionTransform(inlineVisitor);
         }
@@ -118,41 +126,36 @@
     protected boolean performFinalAction() throws AlgebricksException {
         return false;
     }
-    
+
     protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        
+
         // Update mapping from variables to expressions during top-down traversal.
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
             List<LogicalVariable> vars = assignOp.getVariables();
-            List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();            
+            List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();
             for (int i = 0; i < vars.size(); i++) {
                 ILogicalExpression expr = exprs.get(i).getValue();
-                // Ignore functions that are in the doNotInline set.                
-                if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                    AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
-                    if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier())) {
-                        continue;
-                    }
+                if (policy.addExpressionToInlineMap(expr, doNotInlineFuncs)) {
+                    varAssignRhs.put(vars.get(i), exprs.get(i).getValue());
                 }
-                varAssignRhs.put(vars.get(i), exprs.get(i).getValue());
             }
         }
 
-        // Descend into children removing projects on the way.
         boolean modified = false;
-        for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
+        // Follow all operators from this operator.
+        for (Mutable<ILogicalOperator> inputOpRef : policy.descendIntoNextOperator(op)) {
             if (inlineVariables(inputOpRef, context)) {
                 modified = true;
-            }            
+            }
         }
 
         if (performBottomUpAction(op)) {
             modified = true;
         }
-        
+
         if (modified) {
             context.computeAndSetTypeEnvironmentForOperator(op);
             context.addToDontApplySet(this, op);
@@ -164,23 +167,23 @@
     }
 
     protected class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
-        
+
         private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
         private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
-        private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();        
+        private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();
         private ILogicalOperator op;
         private IOptimizationContext context;
         // If set, only replace this variable reference.
         private LogicalVariable targetVar;
-        
+
         public InlineVariablesVisitor(Map<LogicalVariable, ILogicalExpression> varAssignRhs) {
             this.varAssignRhs = varAssignRhs;
         }
-        
+
         public void setTargetVariable(LogicalVariable targetVar) {
             this.targetVar = targetVar;
         }
-        
+
         public void setContext(IOptimizationContext context) {
             this.context = context;
         }
@@ -189,7 +192,7 @@
             this.op = op;
             liveVars.clear();
         }
-        
+
         @Override
         public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {            
             ILogicalExpression e = exprRef.getValue();
@@ -209,7 +212,7 @@
                         // Variable was not produced by an assign.
                         return false;
                     }
-                    
+
                     // Make sure used variables from rhs are live.
                     if (liveVars.isEmpty()) {
                         VariableUtilities.getLiveVariables(op, liveVars);
@@ -221,7 +224,7 @@
                             return false;
                         }
                     }
-                    
+
                     // Replace variable reference with a clone of the rhs expr.
                     exprRef.setValue(rhs.cloneExpression());
                     return true;
@@ -242,4 +245,15 @@
             }
         }
     }
+    
+    public static interface IInlineVariablePolicy {
+
+        public boolean addExpressionToInlineMap(ILogicalExpression expr, Set<FunctionIdentifier> doNotInlineFuncs);
+
+        public List<Mutable<ILogicalOperator>> descendIntoNextOperator(AbstractLogicalOperator op);
+
+        public boolean transformOperator(AbstractLogicalOperator op);
+
+    }
+
 }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 0463350..629ac5d 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -242,6 +242,7 @@
     @Override
     public void stop() throws Exception {
         LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
+        datasetDirectoryService.stop();
         executor.shutdownNow();
         webServer.stop();
         sweeper.cancel();
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index ee1cc67..7b060c7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -19,6 +19,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
@@ -41,21 +42,28 @@
  * job.
  */
 public class DatasetDirectoryService implements IDatasetDirectoryService {
+    private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName());
+
     private final long resultTTL;
 
     private final long resultSweepThreshold;
 
     private final Map<JobId, IDatasetStateRecord> jobResultLocations;
 
+    private ResultStateSweeper resultStateSweeper;
+
     public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
         jobResultLocations = new LinkedHashMap<JobId, IDatasetStateRecord>();
+        resultStateSweeper = null;
     }
 
     @Override
     public void init(ExecutorService executor) {
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+        LOGGER.info("New ResultStateSweepser in DatasetDirectoryService.");
+        resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
+        executor.execute(resultStateSweeper);
     }
 
     @Override
@@ -260,4 +268,9 @@
         }
         return null;
     }
+
+    @Override
+    public void stop() {
+        resultStateSweeper.close();
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
index 3de3c50..05ed177 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/IDatasetDirectoryService.java
@@ -42,4 +42,6 @@
 
     public DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
             DatasetDirectoryRecord[] knownLocations) throws HyracksDataException;
+
+    public void stop();
 }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
index 69b560c..a5ad630 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -39,16 +39,19 @@
 
     private final List<JobId> toBeCollected;
 
+    private boolean running;
+
     public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) {
         this.datasetManager = datasetManager;
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
         toBeCollected = new ArrayList<JobId>();
+        running = true;
     }
 
     @Override
     public void run() {
-        while (true) {
+        while (running) {
             try {
                 Thread.sleep(resultSweepThreshold);
                 sweep();
@@ -57,9 +60,14 @@
                 // There isn't much we can do really here
             }
         }
-
+        LOGGER.info("Result cleaner thread has stopped.");
     }
 
+    public void close() {
+        running = false;
+        LOGGER.info("Result cleaner thread has been flagged to stop.");
+   }
+
     private void sweep() {
         synchronized (datasetManager) {
             toBeCollected.clear();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 325e2bc..b28ffaa 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -50,6 +50,8 @@
 
     private final DatasetMemoryManager datasetMemoryManager;
 
+    private final ResultStateSweeper resultStateSweeper;
+
     public DatasetPartitionManager(NodeControllerService ncs, Executor executor, int availableMemory, long resultTTL,
             long resultSweepThreshold) {
         this.ncs = ncs;
@@ -62,7 +64,8 @@
             datasetMemoryManager = null;
         }
         partitionResultStateMap = new LinkedHashMap<JobId, IDatasetStateRecord>();
-        executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold));
+        resultStateSweeper = new ResultStateSweeper(this, resultTTL, resultSweepThreshold);
+        executor.execute(resultStateSweeper);
     }
 
     @Override
@@ -208,6 +211,7 @@
             deinit(entry.getKey());
         }
         deallocatableRegistry.close();
+        resultStateSweeper.close();
     }
 
     @Override