diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalExpression.java
index 84d7fab..c4ae9bc 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalExpression.java
@@ -67,4 +67,6 @@
     public boolean splitIntoConjuncts(List<Mutable<ILogicalExpression>> conjs);
 
     public abstract ILogicalExpression cloneExpression();
+
+    public boolean isFunctional();
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
index c11dd80..d91cac0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/ILogicalOperator.java
@@ -21,6 +21,7 @@
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -32,6 +33,10 @@
 
 public interface ILogicalOperator {
 
+    public LogicalOperatorTag getOperatorTag();
+
+    public ExecutionMode getExecutionMode();
+
     public List<Mutable<ILogicalOperator>> getInputs();
 
     boolean hasInputs();
@@ -89,7 +94,7 @@
     public IPhysicalPropertiesVector getDeliveredPhysicalProperties();
 
     public void computeDeliveredPhysicalProperties(IOptimizationContext context) throws AlgebricksException;
-    
+
     /**
      * Indicates whether the expressions used by this operator must be variable reference expressions.
      */
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
index 0c19c79..b8a9c87 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractFunctionCallExpression.java
@@ -323,4 +323,18 @@
         }
     }
 
+    @Override
+    public boolean isFunctional() {
+        if (!finfo.isFunctional()) {
+            return false;
+        }
+
+        for (Mutable<ILogicalExpression> e : arguments) {
+            if (!e.getValue().isFunctional()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java
index abc46c0..2718076 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/AbstractLogicalExpression.java
@@ -36,4 +36,9 @@
         // do nothing
     }
 
+    @Override
+    public boolean isFunctional() {
+        return true;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
index 0482133..1a3fe47 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/expressions/ConstantExpression.java
@@ -170,4 +170,5 @@
     public boolean splitIntoConjuncts(List<Mutable<ILogicalExpression>> conjs) {
         return false;
     }
+
 }
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AbstractFunctionInfo.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AbstractFunctionInfo.java
new file mode 100644
index 0000000..fe47def
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/AbstractFunctionInfo.java
@@ -0,0 +1,20 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.functions;
+
+import java.io.Serializable;
+
+public abstract class AbstractFunctionInfo implements IFunctionInfo, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final boolean isFunctional;
+
+    protected AbstractFunctionInfo(boolean isFunctional) {
+        this.isFunctional = isFunctional;
+    }
+
+    @Override
+    public boolean isFunctional() {
+        return isFunctional;
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
index 4c294a7..02cd6de 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/functions/IFunctionInfo.java
@@ -14,7 +14,8 @@
  */
 package edu.uci.ics.hyracks.algebricks.core.algebra.functions;
 
-
 public interface IFunctionInfo {
     FunctionIdentifier getFunctionIdentifier();
+
+    public boolean isFunctional();
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index a7e8479..78efd0d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -69,6 +69,7 @@
         // outputs = new ArrayList<LogicalOperatorReference>();
     }
 
+    @Override
     public abstract LogicalOperatorTag getOperatorTag();
 
     public ExecutionMode getExecutionMode() {
@@ -182,7 +183,7 @@
         return new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(), ctx.getNullableTypeComputer(),
                 ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
     }
-    
+
     @Override
     public boolean requiresVariableReferenceExpressions() {
         return true;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java
index 710e02f..5872e6d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/EmptyTupleSourceOperator.java
@@ -29,12 +29,6 @@
 
 public class EmptyTupleSourceOperator extends AbstractLogicalOperator {
 
-    // public final static EmptyTupleSourceOperator INSTANCE = new
-    // EmptyTupleSourceOperator();
-
-    public EmptyTupleSourceOperator() {
-    }
-
     @Override
     public LogicalOperatorTag getOperatorTag() {
         return LogicalOperatorTag.EMPTYTUPLESOURCE;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 6285b96..fc926d2 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -75,15 +75,17 @@
         boolean change = false;
         switch (op.getOperatorTag()) {
             case DATASOURCESCAN: {
-                // ILogicalExpression e = ((UnnestOperator) op).getExpression();
-                // if (AnalysisUtil.isDataSetCall(e)) {
                 op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
-                AbstractLogicalOperator child = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-                if (child.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
+                AbstractLogicalOperator currentOp = op;
+                while (currentOp.getInputs().size() == 1) {
+                    AbstractLogicalOperator child = (AbstractLogicalOperator) currentOp.getInputs().get(0).getValue();
+                    if (child.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+                        break;
+                    }
                     child.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+                    currentOp = child;
                 }
                 change = true;
-                // }
                 break;
             }
             case NESTEDTUPLESOURCE: {
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletFunction.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletFunction.java
index 47148ac..ba5c5aa 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletFunction.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletFunction.java
@@ -14,13 +14,16 @@
  */
 package edu.uci.ics.hyracks.algebricks.examples.piglet.metadata;
 
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AbstractFunctionInfo;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 
-public class PigletFunction implements IFunctionInfo {
+public class PigletFunction extends AbstractFunctionInfo {
+    private static final long serialVersionUID = 1L;
+
     private final FunctionIdentifier fid;
 
     public PigletFunction(FunctionIdentifier fid) {
+        super(true);
         this.fid = fid;
     }
 
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
index 767f458..1c2cfae 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/rewriter/PigletRewriteRuleset.java
@@ -32,7 +32,7 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.InlineVariablesRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushLimitDownRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.CopyLimitDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectIntoDataSourceScanRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectDownRule;
@@ -106,13 +106,13 @@
         physicalPlanRewrites.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalPlanRewrites.add(new EnforceStructuralPropertiesRule());
         physicalPlanRewrites.add(new PushProjectDownRule());
-        physicalPlanRewrites.add(new PushLimitDownRule());
+        physicalPlanRewrites.add(new CopyLimitDownRule());
         return physicalPlanRewrites;
     }
 
     public final static List<IAlgebraicRewriteRule> buildPhysicalRewritesTopLevelRuleCollection() {
         List<IAlgebraicRewriteRule> physicalPlanRewrites = new LinkedList<IAlgebraicRewriteRule>();
-        physicalPlanRewrites.add(new PushLimitDownRule());
+        physicalPlanRewrites.add(new CopyLimitDownRule());
         return physicalPlanRewrites;
     }
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
new file mode 100644
index 0000000..2f080fb
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class CopyLimitDownRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
+            return false;
+        }
+        LimitOperator limitOp = (LimitOperator) op;
+        if (!limitOp.isTopmostLimitOp()) {
+            return false;
+        }
+
+        List<LogicalVariable> limitUsedVars = new ArrayList<>();
+        VariableUtilities.getUsedVariables(limitOp, limitUsedVars);
+
+        Mutable<ILogicalOperator> safeOpRef = null;
+        Mutable<ILogicalOperator> candidateOpRef = limitOp.getInputs().get(0);
+
+        List<LogicalVariable> candidateProducedVars = new ArrayList<>();
+        while (true) {
+            candidateProducedVars.clear();
+            ILogicalOperator candidateOp = candidateOpRef.getValue();
+            LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag();
+            if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap()
+                    || candidateOpTag == LogicalOperatorTag.SELECT || candidateOpTag == LogicalOperatorTag.LIMIT
+                    || !OperatorPropertiesUtil.disjoint(limitUsedVars, candidateProducedVars)) {
+                break;
+            }
+
+            safeOpRef = candidateOpRef;
+            candidateOpRef = safeOpRef.getValue().getInputs().get(0);
+        }
+
+        if (safeOpRef != null) {
+            ILogicalOperator safeOp = safeOpRef.getValue();
+            Mutable<ILogicalOperator> unsafeOpRef = safeOp.getInputs().get(0);
+            ILogicalOperator unsafeOp = unsafeOpRef.getValue();
+            LimitOperator limitCloneOp = null;
+            if (limitOp.getOffset().getValue() == null) {
+                limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false);
+            } else {
+                IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction(
+                        AlgebricksBuiltinFunctions.NUMERIC_ADD);
+                List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>();
+                addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
+                addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
+                ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
+                limitCloneOp = new LimitOperator(maxPlusOffset, false);
+            }
+            limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
+            limitCloneOp.getInputs().add(new MutableObject<ILogicalOperator>(unsafeOp));
+            limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
+            limitCloneOp.recomputeSchema();
+            unsafeOpRef.setValue(limitCloneOp);
+            context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
+            context.addToDontApplySet(this, limitOp);
+        }
+
+        return safeOpRef != null;
+    }
+}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index 02e090d..d4834a3 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -128,6 +128,11 @@
             /** copy the original order-by operator and insert on-top-of the subplan operator */
             context.addToDontApplySet(this, child);
             OrderOperator sourceOrderOp = (OrderOperator) child;
+            for (Pair<IOrder, Mutable<ILogicalExpression>> expr : sourceOrderOp.getOrderExpressions()) {
+                if (!expr.second.getValue().isFunctional()) {
+                    return false;
+                }
+            }
             List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = deepCopyOrderAndExpression(sourceOrderOp
                     .getOrderExpressions());
             OrderOperator newOrderOp = new OrderOperator(orderExprs);
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
index 9e0d8d0..eafb3b0 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/ExtractCommonExpressionsRule.java
@@ -251,7 +251,7 @@
                         return true;
                     }
                 } else {
-                    if (assignCommonExpression(exprEqClass, expr)) {
+                    if (expr.isFunctional() && assignCommonExpression(exprEqClass, expr)) {
                         //re-obtain the live vars after rewriting in the method called in the if condition
                         Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
                         VariableUtilities.getLiveVariables(op, liveVars);
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index 40b049b..aa6916d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -45,25 +45,20 @@
  * (some variables are generated by datasources).
  * Inlining variables may enable other optimizations by allowing selects and assigns to be moved
  * (e.g., a select may be pushed into a join to enable an efficient physical join operator).
- * 
  * Preconditions/Assumptions:
- * Assumes no projects are in the plan. Only inlines variables whose assigned expression is a function call 
- * (i.e., this rule ignores right-hand side constants and other variable references expressions  
- * 
+ * Assumes no projects are in the plan. Only inlines variables whose assigned expression is a function call
+ * (i.e., this rule ignores right-hand side constants and other variable references expressions
  * Postconditions/Examples:
  * All qualifying variables have been inlined.
- * 
  * Example (simplified):
- * 
  * Before plan:
  * select <- [$$1 < $$2 + $$0]
- *   assign [$$2] <- [funcZ() + $$0]
- *     assign [$$0, $$1] <- [funcX(), funcY()]
- * 
+ * assign [$$2] <- [funcZ() + $$0]
+ * assign [$$0, $$1] <- [funcX(), funcY()]
  * After plan:
  * select <- [funcY() < funcZ() + funcX() + funcX()]
- *   assign [$$2] <- [funcZ() + funcX()]
- *     assign [$$0, $$1] <- [funcX(), funcY()]
+ * assign [$$2] <- [funcZ() + funcX()]
+ * assign [$$0, $$1] <- [funcX(), funcY()]
  */
 public class InlineVariablesRule implements IAlgebraicRewriteRule {
 
@@ -73,12 +68,12 @@
 
     // Visitor for replacing variable reference expressions with their originating expression.
     protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
-    
+
     // Set of FunctionIdentifiers that we should not inline.
     protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<FunctionIdentifier>();
-    
+
     protected boolean hasRun = false;
-    
+
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
         return false;
@@ -100,12 +95,12 @@
         hasRun = true;
         return modified;
     }
-    
+
     protected void prepare(IOptimizationContext context) {
         varAssignRhs.clear();
         inlineVisitor.setContext(context);
     }
-    
+
     protected boolean performBottomUpAction(AbstractLogicalOperator op) throws AlgebricksException {
         // Only inline variables in operators that can deal with arbitrary expressions.
         if (!op.requiresVariableReferenceExpressions()) {
@@ -118,22 +113,22 @@
     protected boolean performFinalAction() throws AlgebricksException {
         return false;
     }
-    
+
     protected boolean inlineVariables(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
         AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        
+
         // Update mapping from variables to expressions during top-down traversal.
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
             List<LogicalVariable> vars = assignOp.getVariables();
-            List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();            
+            List<Mutable<ILogicalExpression>> exprs = assignOp.getExpressions();
             for (int i = 0; i < vars.size(); i++) {
                 ILogicalExpression expr = exprs.get(i).getValue();
-                // Ignore functions that are in the doNotInline set.                
+                // Ignore functions that are either in the doNotInline set or are non-functional               
                 if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
                     AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
-                    if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier())) {
+                    if (doNotInlineFuncs.contains(funcExpr.getFunctionIdentifier()) || !funcExpr.isFunctional()) {
                         continue;
                     }
                 }
@@ -146,13 +141,13 @@
         for (Mutable<ILogicalOperator> inputOpRef : op.getInputs()) {
             if (inlineVariables(inputOpRef, context)) {
                 modified = true;
-            }            
+            }
         }
 
         if (performBottomUpAction(op)) {
             modified = true;
         }
-        
+
         if (modified) {
             context.computeAndSetTypeEnvironmentForOperator(op);
             context.addToDontApplySet(this, op);
@@ -164,23 +159,23 @@
     }
 
     protected class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
-        
+
         private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
         private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
-        private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();        
+        private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();
         private ILogicalOperator op;
         private IOptimizationContext context;
         // If set, only replace this variable reference.
         private LogicalVariable targetVar;
-        
+
         public InlineVariablesVisitor(Map<LogicalVariable, ILogicalExpression> varAssignRhs) {
             this.varAssignRhs = varAssignRhs;
         }
-        
+
         public void setTargetVariable(LogicalVariable targetVar) {
             this.targetVar = targetVar;
         }
-        
+
         public void setContext(IOptimizationContext context) {
             this.context = context;
         }
@@ -189,9 +184,9 @@
             this.op = op;
             liveVars.clear();
         }
-        
+
         @Override
-        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {            
+        public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
             ILogicalExpression e = exprRef.getValue();
             switch (((AbstractLogicalExpression) e).getExpressionTag()) {
                 case VARIABLE: {
@@ -200,16 +195,18 @@
                     if (targetVar != null && var != targetVar) {
                         return false;
                     }
+
                     // Make sure has not been excluded from inlining.
                     if (context.shouldNotBeInlined(var)) {
                         return false;
                     }
+
                     ILogicalExpression rhs = varAssignRhs.get(var);
                     if (rhs == null) {
                         // Variable was not produced by an assign.
                         return false;
                     }
-                    
+
                     // Make sure used variables from rhs are live.
                     if (liveVars.isEmpty()) {
                         VariableUtilities.getLiveVariables(op, liveVars);
@@ -221,7 +218,7 @@
                             return false;
                         }
                     }
-                    
+
                     // Replace variable reference with a clone of the rhs expr.
                     exprRef.setValue(rhs.cloneExpression());
                     return true;
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
index f03f4e3..418ee36 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushAssignBelowUnionAllRule.java
@@ -36,31 +36,28 @@
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
- * Pushes an AssignOperator below a UnionAll operator by creating an new AssignOperator below each of 
+ * Pushes an AssignOperator below a UnionAll operator by creating an new AssignOperator below each of
  * the UnionAllOperator's branches with appropriate variable replacements.
- * This rule can help to enable other rules that are difficult to fire across a UnionAllOperator, 
+ * This rule can help to enable other rules that are difficult to fire across a UnionAllOperator,
  * for example, eliminating common sub-expressions.
- * 
  * Example:
- * 
  * Before plan:
  * ...
  * assign [$$20, $$21] <- [funcA($$3), funcB($$6)]
- *   union ($$1, $$2, $$3) ($$4, $$5, $$6)
- *     union_branch_0
- *       ...
- *     union_branch_1
- *       ...
- *     
+ * union ($$1, $$2, $$3) ($$4, $$5, $$6)
+ * union_branch_0
+ * ...
+ * union_branch_1
+ * ...
  * After plan:
  * ...
  * union ($$1, $$2, $$3) ($$4, $$5, $$6) ($$22, $$24, $$20) ($$23, $$25, $$21)
- *   assign [$$22, $$23] <- [funcA($$1), funcB($$4)]
- *     union_branch_0
- *       ...
- *   assign [$$24, $$25] <- [funcA($$2), funcB($$5)]
- *     union_branch_1
- *       ...
+ * assign [$$22, $$23] <- [funcA($$1), funcB($$4)]
+ * union_branch_0
+ * ...
+ * assign [$$24, $$25] <- [funcA($$2), funcB($$5)]
+ * union_branch_1
+ * ...
  */
 public class PushAssignBelowUnionAllRule implements IAlgebraicRewriteRule {
 
@@ -84,6 +81,11 @@
                 continue;
             }
             AssignOperator assignOp = (AssignOperator) childOp;
+            for (Mutable<ILogicalExpression> expr : assignOp.getExpressions()) {
+                if (!expr.getValue().isFunctional()) {
+                    return false;
+                }
+            }
 
             AbstractLogicalOperator childOfChildOp = (AbstractLogicalOperator) assignOp.getInputs().get(0).getValue();
             if (childOfChildOp.getOperatorTag() != LogicalOperatorTag.UNIONALL) {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
deleted file mode 100644
index a0bebaf..0000000
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import java.util.LinkedList;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class PushLimitDownRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
-        return false;
-    }
-
-    /**
-     * When a global Limit over a merge-exchange is found, a local Limit is
-     * pushed down.
-     */
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
-            return false;
-        }
-        LimitOperator opLim = (LimitOperator) op;
-        if (!opLim.isTopmostLimitOp()) {
-            return false;
-        }
-
-        Mutable<ILogicalOperator> opRef2 = opLim.getInputs().get(0);
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
-
-        if (context.checkAndAddToAlreadyCompared(op, op2)) {
-            return false;
-        }
-        if (op2.getOperatorTag() != LogicalOperatorTag.EXCHANGE) {
-            return false;
-        }
-        PhysicalOperatorTag op2PTag = op2.getPhysicalOperator().getOperatorTag();
-        // we should test for any kind of merge
-        if (op2PTag != PhysicalOperatorTag.RANDOM_MERGE_EXCHANGE && op2PTag != PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
-            return false;
-        }
-
-        LinkedList<LogicalVariable> usedVars1 = new LinkedList<LogicalVariable>();
-        VariableUtilities.getUsedVariables(opLim, usedVars1);
-
-        do {
-            if (op2.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE
-                    || op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE
-                    || op2.getOperatorTag() == LogicalOperatorTag.LIMIT) {
-                return false;
-            }
-            if (op2.getInputs().size() > 1 || !op2.isMap()) {
-                break;
-            }
-            LinkedList<LogicalVariable> vars2 = new LinkedList<LogicalVariable>();
-            VariableUtilities.getProducedVariables(op2, vars2);
-            if (!OperatorPropertiesUtil.disjoint(vars2, usedVars1)) {
-                return false;
-            }
-            // we assume pipelineable ops. have only one input
-            opRef2 = op2.getInputs().get(0);
-            op2 = (AbstractLogicalOperator) opRef2.getValue();
-        } while (true);
-
-        LimitOperator clone2 = null;
-        if (opLim.getOffset().getValue() == null) {
-            clone2 = new LimitOperator(opLim.getMaxObjects().getValue(), false);
-        } else {
-            // push limit (max+offset)
-            IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction(
-                    AlgebricksBuiltinFunctions.NUMERIC_ADD);
-            ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd,
-                    opLim.getMaxObjects(), opLim.getOffset());
-            clone2 = new LimitOperator(maxPlusOffset, false);
-        }
-        clone2.setPhysicalOperator(new StreamLimitPOperator());
-        clone2.getInputs().add(new MutableObject<ILogicalOperator>(op2));
-        clone2.setExecutionMode(op2.getExecutionMode());
-        clone2.recomputeSchema();
-        opRef2.setValue(clone2);
-        context.computeAndSetTypeEnvironmentForOperator(clone2);
-        return true;
-    }
-
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 06ef109..addfb2e 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -33,118 +33,136 @@
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
-public class AlgebricksMetaOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+public class AlgebricksMetaOperatorDescriptor extends
+		AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = 1L;
 
-    // array of factories for building the local runtime pipeline
-    private final AlgebricksPipeline pipeline;
+	// array of factories for building the local runtime pipeline
+	private final AlgebricksPipeline pipeline;
 
-    public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity, int outputArity,
-            IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] internalRecordDescriptors) {
-        super(spec, inputArity, outputArity);
-        if (outputArity == 1) {
-            this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
-        }
-        this.pipeline = new AlgebricksPipeline(runtimeFactories, internalRecordDescriptors);
-    }
+	public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec,
+			int inputArity, int outputArity,
+			IPushRuntimeFactory[] runtimeFactories,
+			RecordDescriptor[] internalRecordDescriptors) {
+		super(spec, inputArity, outputArity);
+		if (outputArity == 1) {
+			this.recordDescriptors[0] = internalRecordDescriptors[internalRecordDescriptors.length - 1];
+		}
+		this.pipeline = new AlgebricksPipeline(runtimeFactories,
+				internalRecordDescriptors);
+	}
 
-    public AlgebricksPipeline getPipeline() {
-        return pipeline;
-    }
+	public AlgebricksPipeline getPipeline() {
+		return pipeline;
+	}
 
-    @Override
-    public JSONObject toJSON() throws JSONException {
-        JSONObject json = super.toJSON();
-        json.put("micro-operators", pipeline.getRuntimeFactories());
-        return json;
-    }
+	@Override
+	public JSONObject toJSON() throws JSONException {
+		JSONObject json = super.toJSON();
+		json.put("micro-operators", pipeline.getRuntimeFactories());
+		return json;
+	}
 
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Asterix { \n");
-        for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
-            sb.append("  " + f.toString() + ";\n");
-        }
-        sb.append("}");
-        // sb.append(super.getInputArity());
-        // sb.append(";");
-        // sb.append(super.getOutputArity());
-        // sb.append(";");
-        return sb.toString();
-    }
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("Asterix { \n");
+		for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+			sb.append("  " + f.toString() + ";\n");
+		}
+		sb.append("}");
+		// sb.append(super.getInputArity());
+		// sb.append(";");
+		// sb.append(super.getOutputArity());
+		// sb.append(";");
+		return sb.toString();
+	}
 
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        if (inputArity == 0) {
-            return createSourceInputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-        } else {
-            return createOneInputOneOutputPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-        }
-    }
+	@Override
+	public IOperatorNodePushable createPushRuntime(
+			final IHyracksTaskContext ctx,
+			final IRecordDescriptorProvider recordDescProvider, int partition,
+			int nPartitions) {
+		if (inputArity == 0) {
+			return createSourceInputPushRuntime(ctx, recordDescProvider,
+					partition, nPartitions);
+		} else {
+			return createOneInputOneOutputPushRuntime(ctx, recordDescProvider,
+					partition, nPartitions);
+		}
+	}
 
-    private IOperatorNodePushable createSourceInputPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+	private IOperatorNodePushable createSourceInputPushRuntime(
+			final IHyracksTaskContext ctx,
+			final IRecordDescriptorProvider recordDescProvider, int partition,
+			int nPartitions) {
+		return new AbstractUnaryOutputSourceOperatorNodePushable() {
 
-            public void initialize() throws HyracksDataException {
-                IFrameWriter startOfPipeline;
-                RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
-                        : null;
+			public void initialize() throws HyracksDataException {
+				IFrameWriter startOfPipeline;
+				RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+						: null;
 
-                PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, null,
-                        pipelineOutputRecordDescriptor);
-                try {
-                    startOfPipeline = pa.assemblePipeline(writer, ctx);
-                } catch (AlgebricksException e) {
-                    throw new HyracksDataException(e);
-                }
-                startOfPipeline.open();
-                startOfPipeline.close();
-            }
-        };
-    }
+				PipelineAssembler pa = new PipelineAssembler(pipeline,
+						inputArity, outputArity, null,
+						pipelineOutputRecordDescriptor);
+				try {
+					startOfPipeline = pa.assemblePipeline(writer, ctx);
+				} catch (AlgebricksException e) {
+					throw new HyracksDataException(e);
+				}
+				startOfPipeline.open();
+				startOfPipeline.close();
+			}
+		};
+	}
 
-    private IOperatorNodePushable createOneInputOneOutputPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-        return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+	private IOperatorNodePushable createOneInputOneOutputPushRuntime(
+			final IHyracksTaskContext ctx,
+			final IRecordDescriptorProvider recordDescProvider, int partition,
+			int nPartitions) {
+		return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
 
-            private IFrameWriter startOfPipeline;
+			private IFrameWriter startOfPipeline;
 
-            @Override
-            public void open() throws HyracksDataException {
-                if (startOfPipeline == null) {
-                    RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
-                            : null;
-                    RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
-                            AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
-                    PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
-                            pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
-                    try {
-                        startOfPipeline = pa.assemblePipeline(writer, ctx);
-                    } catch (AlgebricksException ae) {
-                        throw new HyracksDataException(ae);
-                    }
-                }
-                startOfPipeline.open();
-            }
+			@Override
+			public void open() throws HyracksDataException {
+				if (startOfPipeline == null) {
+					RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
+							: null;
+					RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider
+							.getInputRecordDescriptor(
+									AlgebricksMetaOperatorDescriptor.this
+											.getActivityId(), 0);
+					PipelineAssembler pa = new PipelineAssembler(pipeline,
+							inputArity, outputArity,
+							pipelineInputRecordDescriptor,
+							pipelineOutputRecordDescriptor);
+					try {
+						startOfPipeline = pa.assemblePipeline(writer, ctx);
+					} catch (AlgebricksException ae) {
+						throw new HyracksDataException(ae);
+					}
+				}
+				startOfPipeline.open();
+			}
 
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                startOfPipeline.nextFrame(buffer);
-            }
+			@Override
+			public void nextFrame(ByteBuffer buffer)
+					throws HyracksDataException {
+				startOfPipeline.nextFrame(buffer);
+			}
 
-            @Override
-            public void close() throws HyracksDataException {
-                startOfPipeline.close();
-            }
+			@Override
+			public void close() throws HyracksDataException {
+				startOfPipeline.close();
+			}
 
-            @Override
-            public void fail() throws HyracksDataException {
-                startOfPipeline.fail();
-            }
-        };
-    }
+			@Override
+			public void fail() throws HyracksDataException {
+				startOfPipeline.fail();
+			}
+		};
+	}
 }
diff --git a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/logical/expression/HiveFunctionInfo.java b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/logical/expression/HiveFunctionInfo.java
index 4f4ca90..95b63b7 100644
--- a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/logical/expression/HiveFunctionInfo.java
+++ b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/logical/expression/HiveFunctionInfo.java
@@ -12,39 +12,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hivesterix.logical.expression;
-
-import java.io.Serializable;
-
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
-
-public class HiveFunctionInfo implements IFunctionInfo, Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * primary function identifier
-     */
-    private transient FunctionIdentifier fid;
-
-    /**
-     * secondary function identifier: function name
-     */
-    private transient Object secondaryFid;
-
-    public HiveFunctionInfo(FunctionIdentifier fid, Object secondFid) {
-        this.fid = fid;
-        this.secondaryFid = secondFid;
-    }
-
-    @Override
-    public FunctionIdentifier getFunctionIdentifier() {
-        return fid;
-    }
-
-    public Object getInfo() {
-        return secondaryFid;
-    }
-
-}
+package edu.uci.ics.hivesterix.logical.expression;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AbstractFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class HiveFunctionInfo extends AbstractFunctionInfo {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * primary function identifier
+     */
+    private transient FunctionIdentifier fid;
+
+    /**
+     * secondary function identifier: function name
+     */
+    private transient Object secondaryFid;
+
+    public HiveFunctionInfo(FunctionIdentifier fid, Object secondFid) {
+        super(true);
+        this.fid = fid;
+        this.secondaryFid = secondFid;
+    }
+
+    @Override
+    public FunctionIdentifier getFunctionIdentifier() {
+        return fid;
+    }
+
+    public Object getInfo() {
+        return secondaryFid;
+    }
+
+}
diff --git a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
index 12b5986..1f31e44 100644
--- a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
+++ b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
@@ -38,7 +38,7 @@
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IntroduceGroupByCombinerRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.IsolateHyracksOperatorsRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PullSelectOutOfEqJoin;
-import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushLimitDownRule;
+import edu.uci.ics.hyracks.algebricks.rewriter.rules.CopyLimitDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectDownRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushProjectIntoDataSourceScanRule;
 import edu.uci.ics.hyracks.algebricks.rewriter.rules.PushSelectDownRule;
@@ -107,7 +107,7 @@
         PHYSICAL_PLAN_REWRITES.add(new EnforceStructuralPropertiesRule());
         PHYSICAL_PLAN_REWRITES.add(new PushProjectDownRule());
         PHYSICAL_PLAN_REWRITES.add(new SetAlgebricksPhysicalOperatorsRule());
-        PHYSICAL_PLAN_REWRITES.add(new PushLimitDownRule());
+        PHYSICAL_PLAN_REWRITES.add(new CopyLimitDownRule());
         PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeWriteRule());
         PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeUnionRule());
     }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
index 7a795da..015e3a9 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/ICCApplicationContext.java
@@ -58,4 +58,5 @@
      * @return The Cluster Controller Context.
      */
     public ICCContext getCCContext();
+   
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
index 51db13e..7225969 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IClusterLifecycleListener.java
@@ -3,9 +3,15 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
+<<<<<<< HEAD
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+=======
  * 
  *     http://www.apache.org/licenses/LICENSE-2.0
  * 
+>>>>>>> master
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
index 3d70a40..c94fa9a 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/INCApplicationContext.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.application;
 
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import edu.uci.ics.hyracks.api.resources.memory.IMemoryManager;
 
 /**
@@ -24,7 +25,14 @@
  */
 public interface INCApplicationContext extends IApplicationContext {
     /**
-     * Gets the node Id of the Node Congtroller.
+     * Gets the life cycle component manager of the Node Controller.
+     * 
+     * @return
+     */
+    public ILifeCycleComponentManager getLifeCycleComponentManager();
+
+    /**
+     * Gets the node Id of the Node Controller.
      * 
      * @return the Node Id.
      */
@@ -58,4 +66,11 @@
      * @return Memory Manager
      */
     public IMemoryManager getMemoryManager();
+
+    /**
+     * Set the handler for state dumps.
+     * 
+     * @param handler
+     */
+    public void setStateDumpHandler(IStateDumpHandler handler);
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IStateDumpHandler.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IStateDumpHandler.java
new file mode 100644
index 0000000..29d5af4
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/application/IStateDumpHandler.java
@@ -0,0 +1,8 @@
+package edu.uci.ics.hyracks.api.application;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface IStateDumpHandler {
+    public void dumpState(OutputStream os) throws IOException;
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 55d80e2..9ff741e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -31,6 +31,7 @@
         GET_CLUSTER_TOPOLOGY,
         CREATE_JOB,
         GET_JOB_STATUS,
+        GET_JOB_INFO,
         START_JOB,
         GET_DATASET_DIRECTORY_SERIVICE_INFO,
         GET_DATASET_RESULT_STATUS,
@@ -76,6 +77,25 @@
         }
     }
 
+    public static class GetJobInfoFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final JobId jobId;
+
+        public GetJobInfoFunction(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.GET_JOB_INFO;
+        }
+
+        public JobId getJobId() {
+            return jobId;
+        }
+    }
+
     public static class StartJobFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
index 2bac49f..98f27f2 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksClientInterfaceRemoteProxy.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.ipc.api.IIPCHandle;
@@ -103,4 +104,11 @@
                 deploymentId);
         rpci.call(ipcHandle, dbf);
     }
+
+    @Override
+    public JobInfo getJobInfo(JobId jobId) throws Exception {
+        HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = new HyracksClientInterfaceFunctions.GetJobInfoFunction(
+                jobId);
+        return (JobInfo) rpci.call(ipcHandle, gjsf);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
index 98836cd..1916360 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/HyracksConnection.java
@@ -36,6 +36,7 @@
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -148,7 +149,7 @@
                 binaryURLs.add(new URL(url));
             }
         }
-        /**deploy the URLs to the CC and NCs*/
+        /** deploy the URLs to the CC and NCs */
         hci.deployBinary(binaryURLs, deploymentId);
         return deploymentId;
     }
@@ -176,4 +177,9 @@
             EnumSet<JobFlag> jobFlags) throws Exception {
         return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags);
     }
+
+    @Override
+    public JobInfo getJobInfo(JobId jobId) throws Exception {
+        return hci.getJobInfo(jobId);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
index 2820cdf..1e44e91 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientConnection.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
@@ -44,6 +45,16 @@
     public JobStatus getJobStatus(JobId jobId) throws Exception;
 
     /**
+     * Gets detailed information about the specified Job.
+     * 
+     * @param jobId
+     *            JobId of the Job
+     * @return {@link JobStatus}
+     * @throws Exception
+     */
+    public JobInfo getJobInfo(JobId jobId) throws Exception;
+
+    /**
      * Start the specified Job.
      * 
      * @param appName
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
index 737152b..d0eeada 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/IHyracksClientInterface.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 
@@ -46,4 +47,7 @@
     public void unDeployBinary(DeploymentId deploymentId) throws Exception;
 
     public JobId startJob(DeploymentId deploymentId, byte[] acggfBytes, EnumSet<JobFlag> jobFlags) throws Exception;
+
+    public JobInfo getJobInfo(JobId jobId) throws Exception;
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 79efaa0..48d7275 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -43,6 +43,11 @@
     }
 
     @Override
+    public JobSpecification getJobSpecification() {
+        return spec;
+    }
+
+    @Override
     public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
             final ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException {
         final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(spec, jobFlags);
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
index 978348b..99a0712 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/IActivityClusterGraphGeneratorFactory.java
@@ -23,4 +23,6 @@
 public interface IActivityClusterGraphGeneratorFactory extends Serializable {
     public IActivityClusterGraphGenerator createActivityClusterGraphGenerator(JobId jobId,
             ICCApplicationContext ccAppCtx, EnumSet<JobFlag> jobFlags) throws HyracksException;
+
+    public JobSpecification getJobSpecification();
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
new file mode 100644
index 0000000..812a32a
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobInfo.java
@@ -0,0 +1,72 @@
+package edu.uci.ics.hyracks.api.job;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+
+public class JobInfo implements Serializable{
+
+    private final JobId jobId;
+
+    private JobStatus status;
+
+    private List<Exception> exceptions;
+
+    private JobStatus pendingStatus;
+
+    private List<Exception> pendingExceptions;
+
+    private Map<OperatorDescriptorId, List<String>> operatorLocations;
+
+    public JobInfo(JobId jobId, JobStatus jobStatus, Map<OperatorDescriptorId, List<String>> operatorLocations) {
+        this.jobId = jobId;
+        this.operatorLocations = operatorLocations;
+        this.status = status;
+    }
+
+    public JobStatus getStatus() {
+        return status;
+    }
+
+    public void setStatus(JobStatus status) {
+        this.status = status;
+    }
+
+    public List<Exception> getExceptions() {
+        return exceptions;
+    }
+
+    public void setExceptions(List<Exception> exceptions) {
+        this.exceptions = exceptions;
+    }
+
+    public JobStatus getPendingStatus() {
+        return pendingStatus;
+    }
+
+    public void setPendingStatus(JobStatus pendingStatus) {
+        this.pendingStatus = pendingStatus;
+    }
+
+    public List<Exception> getPendingExceptions() {
+        return pendingExceptions;
+    }
+
+    public void setPendingExceptions(List<Exception> pendingExceptions) {
+        this.pendingExceptions = pendingExceptions;
+    }
+
+    public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+        return operatorLocations;
+    }
+
+    public void setOperatorLocations(Map<OperatorDescriptorId, List<String>> operatorLocations) {
+        this.operatorLocations = operatorLocations;
+    }
+
+    public JobId getJobId() {
+        return jobId;
+    }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
index 2cb757f..2538510 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponent.java
@@ -21,5 +21,7 @@
 
     public void start();
 
+    public void dumpState(OutputStream os) throws IOException;
+
     public void stop(boolean dumpState, OutputStream ouputStream) throws IOException;
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
index 6247c17..b3b2566 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/ILifeCycleComponentManager.java
@@ -15,6 +15,7 @@
 package edu.uci.ics.hyracks.api.lifecycle;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.Map;
 
@@ -24,7 +25,11 @@
 
     public void startAll();
 
+    public void dumpState(OutputStream os) throws IOException;
+
     public void stopAll(boolean dumpState) throws IOException;
 
     public void configure(Map<String, String> configuration);
+
+    public String getDumpPath();
 }
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
index ad708e9..70eed11 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/lifecycle/LifeCycleComponentManager.java
@@ -17,6 +17,7 @@
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -25,8 +26,6 @@
 
 public class LifeCycleComponentManager implements ILifeCycleComponentManager {
 
-    public final static LifeCycleComponentManager INSTANCE = new LifeCycleComponentManager();
-
     public static final class Config {
         public static final String DUMP_PATH_KEY = "DUMP_PATH";
     }
@@ -39,7 +38,7 @@
     private String dumpPath;
     private boolean configured;
 
-    private LifeCycleComponentManager() {
+    public LifeCycleComponentManager() {
         components = new ArrayList<ILifeCycleComponent>();
         stopInitiated = false;
         configured = false;
@@ -150,6 +149,19 @@
         configured = true;
     }
 
+    @Override
+    public String getDumpPath() {
+        return dumpPath;
+    }
+
+    @Override
+    public void dumpState(OutputStream os) throws IOException {
+        for (int index = components.size() - 1; index >= 0; index--) {
+            ILifeCycleComponent component = components.get(index);
+            component.dumpState(os);
+        }
+    }
+
     public boolean stoppedAll() {
         return stopped;
     }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 0463350..8184fe6 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -42,6 +42,7 @@
 import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
@@ -53,8 +54,10 @@
 import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
 import edu.uci.ics.hyracks.control.cc.work.CliDeployBinaryWork;
 import edu.uci.ics.hyracks.control.cc.work.CliUnDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
 import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
+import edu.uci.ics.hyracks.control.cc.work.GetJobInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
 import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
@@ -63,6 +66,7 @@
 import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
 import edu.uci.ics.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import edu.uci.ics.hyracks.control.cc.work.NotifyStateDumpResponse;
 import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
@@ -80,6 +84,7 @@
 import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
 import edu.uci.ics.hyracks.control.common.deployment.DeploymentRun;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.Function;
 import edu.uci.ics.hyracks.control.common.logs.LogFile;
 import edu.uci.ics.hyracks.control.common.work.IPCResponder;
@@ -136,6 +141,8 @@
 
     private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
 
+    private final Map<String, StateDumpRun> stateDumpRunMap;
+
     public ClusterControllerService(final CCConfig ccConfig) throws Exception {
         this.ccConfig = ccConfig;
         File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
@@ -192,6 +199,7 @@
         jobCounter = 0;
 
         deploymentRunMap = new HashMap<DeploymentId, DeploymentRun>();
+        stateDumpRunMap = new HashMap<>();
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
@@ -350,6 +358,13 @@
                     return;
                 }
 
+                case GET_JOB_INFO: {
+                    HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
+                    workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
+                            new IPCResponder<JobInfo>(handle, mid)));
+                    return;
+                }
+
                 case START_JOB: {
                     HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn;
                     JobId jobId = createJobId();
@@ -540,11 +555,30 @@
                             }));
                     return;
                 }
+
+                case STATE_DUMP_RESPONSE: {
+                    CCNCFunctions.StateDumpResponseFunction dsrf = (StateDumpResponseFunction) fn;
+                    workQueue.schedule(new NotifyStateDumpResponse(ClusterControllerService.this, dsrf.getNodeId(),
+                            dsrf.getStateDumpId(), dsrf.getState()));
+                    return;
+                }
             }
             LOGGER.warning("Unknown function: " + fn.getFunctionId());
         }
     }
 
+    public synchronized void addStateDumpRun(String id, StateDumpRun sdr) {
+        stateDumpRunMap.put(id, sdr);
+    }
+
+    public synchronized StateDumpRun getStateDumpRun(String id) {
+        return stateDumpRunMap.get(id);
+    }
+
+    public synchronized void removeStateDumpRun(String id) {
+        stateDumpRunMap.remove(id);
+    }
+
     /**
      * Add a deployment run
      * 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index bae0eb5..dab05f7 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -16,6 +16,7 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,6 +30,7 @@
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
@@ -87,6 +89,8 @@
 
     private List<Exception> pendingExceptions;
 
+    private Map<OperatorDescriptorId, List<String>> operatorLocations;
+
     public JobRun(ClusterControllerService ccs, DeploymentId deploymentId, JobId jobId,
             IActivityClusterGraphGenerator acgg, EnumSet<JobFlag> jobFlags) {
         this.deploymentId = deploymentId;
@@ -101,6 +105,7 @@
         cleanupPendingNodeIds = new HashSet<String>();
         profile = new JobProfile(jobId);
         connectorPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+        operatorLocations = new HashMap<OperatorDescriptorId, List<String>>();
     }
 
     public DeploymentId getDeploymentId() {
@@ -178,6 +183,15 @@
         this.endTime = endTime;
     }
 
+    public void registerOperatorLocation(OperatorDescriptorId op, String location) {
+        List<String> locations = operatorLocations.get(op);
+        if (locations == null) {
+            locations = new ArrayList<String>();
+            operatorLocations.put(op, locations);
+        }
+        locations.add(location);
+    }
+
     @Override
     public synchronized void waitForCompletion() throws Exception {
         while (status != JobStatus.TERMINATED && status != JobStatus.FAILURE) {
@@ -348,8 +362,7 @@
                                 taskAttempt.put("end-time", ta.getEndTime());
                                 List<Exception> exceptions = ta.getExceptions();
                                 if (exceptions != null && !exceptions.isEmpty()) {
-                                    List<Exception> filteredExceptions = ExceptionUtils
-                                            .getActualExceptions(exceptions);
+                                    List<Exception> filteredExceptions = ExceptionUtils.getActualExceptions(exceptions);
                                     for (Exception exception : filteredExceptions) {
                                         StringWriter exceptionWriter = new StringWriter();
                                         exception.printStackTrace(new PrintWriter(exceptionWriter));
@@ -379,4 +392,8 @@
 
         return result;
     }
-}
\ No newline at end of file
+
+    public Map<OperatorDescriptorId, List<String>> getOperatorLocations() {
+        return operatorLocations;
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index fd6360a..d2c018f 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -35,6 +35,7 @@
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
 import edu.uci.ics.hyracks.api.dataflow.ActivityId;
 import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -92,6 +93,7 @@
 
     public void startJob() throws HyracksException {
         startRunnableActivityClusters();
+        ccs.getApplicationContext().notifyJobStart(jobRun.getJobId());
     }
 
     private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Collection<ActivityCluster> roots)
@@ -326,6 +328,8 @@
                 tads = new ArrayList<TaskAttemptDescriptor>();
                 taskAttemptMap.put(nodeId, tads);
             }
+            OperatorDescriptorId opId = tid.getActivityId().getOperatorDescriptorId();
+            jobRun.registerOperatorLocation(opId, nodeId);
             ActivityPartitionDetails apd = ts.getActivityPlan().getActivityPartitionDetails();
             TaskAttemptDescriptor tad = new TaskAttemptDescriptor(taskAttempt.getTaskAttemptId(),
                     apd.getPartitionCount(), apd.getInputPartitionCounts(), apd.getOutputPartitionCounts());
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/StateDumpRESTAPIFunction.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/StateDumpRESTAPIFunction.java
new file mode 100644
index 0000000..cd79eb3
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/StateDumpRESTAPIFunction.java
@@ -0,0 +1,33 @@
+package edu.uci.ics.hyracks.control.cc.web;
+
+import java.util.Map;
+
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.web.util.IJSONOutputFunction;
+import edu.uci.ics.hyracks.control.cc.work.GatherStateDumpsWork;
+import edu.uci.ics.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
+
+public class StateDumpRESTAPIFunction implements IJSONOutputFunction {
+    private final ClusterControllerService ccs;
+
+    public StateDumpRESTAPIFunction(ClusterControllerService ccs) {
+        this.ccs = ccs;
+    }
+
+    @Override
+    public JSONObject invoke(String[] arguments) throws Exception {
+        GatherStateDumpsWork gsdw = new GatherStateDumpsWork(ccs);
+        ccs.getWorkQueue().scheduleAndSync(gsdw);
+        StateDumpRun sdr = gsdw.getStateDumpRun();
+        sdr.waitForCompletion();
+
+        JSONObject result = new JSONObject();
+        for (Map.Entry<String, String> e : sdr.getStateDump().entrySet()) {
+            result.put(e.getKey(), e.getValue());
+        }
+        return result;
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
index dc59633..c100111 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/WebServer.java
@@ -65,6 +65,7 @@
         RoutingHandler rh = new RoutingHandler();
         rh.addHandler("jobs", new JSONOutputRequestHandler(new JobsRESTAPIFunction(ccs)));
         rh.addHandler("nodes", new JSONOutputRequestHandler(new NodesRESTAPIFunction(ccs)));
+        rh.addHandler("statedump", new JSONOutputRequestHandler(new StateDumpRESTAPIFunction(ccs)));
         handler.setHandler(rh);
         addHandler(handler);
 
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GatherStateDumpsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GatherStateDumpsWork.java
new file mode 100644
index 0000000..06e3758
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GatherStateDumpsWork.java
@@ -0,0 +1,84 @@
+package edu.uci.ics.hyracks.control.cc.work;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GatherStateDumpsWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+
+    private final StateDumpRun sdr;
+
+    public GatherStateDumpsWork(ClusterControllerService ccs) {
+        this.ccs = ccs;
+        this.sdr = new StateDumpRun(ccs);
+    }
+
+    @Override
+    public void doRun() throws Exception {
+        ccs.addStateDumpRun(sdr.stateDumpId, sdr);
+        sdr.setNCs(new HashSet<>(ccs.getNodeMap().keySet()));
+        for (NodeControllerState ncs : ccs.getNodeMap().values()) {
+            ncs.getNodeController().dumpState(sdr.stateDumpId);
+        }
+    }
+
+    public StateDumpRun getStateDumpRun() {
+        return sdr;
+    }
+
+    public static class StateDumpRun {
+
+        private final ClusterControllerService ccs;
+
+        private final String stateDumpId;
+
+        private final Map<String, String> ncStates;
+
+        private Set<String> ncIds;
+
+        private boolean complete;
+
+        public StateDumpRun(ClusterControllerService ccs) {
+            this.ccs = ccs;
+            stateDumpId = UUID.randomUUID().toString();
+            ncStates = new HashMap<>();
+            complete = false;
+        }
+
+        public void setNCs(Set<String> ncIds) {
+            this.ncIds = ncIds;
+        }
+
+        public Map<String, String> getStateDump() {
+            return ncStates;
+        }
+
+        public synchronized void notifyStateDumpReceived(String nodeId, String state) {
+            ncIds.remove(nodeId);
+            ncStates.put(nodeId, state);
+            if (ncIds.size() == 0) {
+                complete = true;
+                ccs.removeStateDumpRun(stateDumpId);
+                notifyAll();
+            }
+        }
+
+        public synchronized void waitForCompletion() throws InterruptedException {
+            while (!complete) {
+                wait();
+            }
+        }
+
+        public String getStateDumpId() {
+            return stateDumpId;
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
new file mode 100644
index 0000000..c2e08f2
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobInfoWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobInfo;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.cc.job.JobRun;
+import edu.uci.ics.hyracks.control.common.work.IResultCallback;
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+
+public class GetJobInfoWork extends SynchronizableWork {
+    private final ClusterControllerService ccs;
+    private final JobId jobId;
+    private final IResultCallback<JobInfo> callback;
+
+    public GetJobInfoWork(ClusterControllerService ccs, JobId jobId, IResultCallback<JobInfo> callback) {
+        this.ccs = ccs;
+        this.jobId = jobId;
+        this.callback = callback;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        try {
+            JobRun run = ccs.getActiveRunMap().get(jobId);
+            if (run == null) {
+                run = ccs.getRunMapArchive().get(jobId);
+            }
+            JobInfo info = run == null ? null
+                    : new JobInfo(run.getJobId(), run.getStatus(), run.getOperatorLocations());
+            callback.setValue(info);
+        } catch (Exception e) {
+            callback.setException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyStateDumpResponse.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyStateDumpResponse.java
new file mode 100644
index 0000000..617bf6a
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/NotifyStateDumpResponse.java
@@ -0,0 +1,27 @@
+package edu.uci.ics.hyracks.control.cc.work;
+
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.work.AbstractWork;
+
+public class NotifyStateDumpResponse extends AbstractWork {
+
+    private final ClusterControllerService ccs;
+
+    private final String stateDumpId;
+
+    private final String nodeId;
+
+    private final String state;
+
+    public NotifyStateDumpResponse(ClusterControllerService ccs, String nodeId, String stateDumpId, String state) {
+        this.ccs = ccs;
+        this.stateDumpId = stateDumpId;
+        this.nodeId = nodeId;
+        this.state = state;
+    }
+
+    @Override
+    public void run() {
+        ccs.getStateDumpRun(stateDumpId).notifyStateDumpReceived(nodeId, state);
+    }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index ec96f2b..926c83f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -44,6 +44,8 @@
 
     public void notifyDeployBinary(DeploymentId deploymentId, String nodeId, DeploymentStatus status) throws Exception;
 
+    public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception;
+
     public void nodeHeartbeat(String id, HeartbeatData hbData) throws Exception;
 
     public void reportProfile(String id, List<JobProfile> profiles) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 464e2b2..ec95d58 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -44,4 +44,6 @@
     public void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
 
     public void undeployBinary(DeploymentId deploymentId) throws Exception;
+
+    public void dumpState(String stateDumpId) throws Exception;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 43129f5..1d4daf7 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -123,4 +123,5 @@
         }
 
     }
-}
\ No newline at end of file
+}
+
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index e5d8e5f..9077cd8 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -88,6 +88,9 @@
         NOTIFY_DEPLOY_BINARY,
         UNDEPLOY_BINARY,
 
+        STATE_DUMP_REQUEST,
+        STATE_DUMP_RESPONSE,
+
         OTHER
     }
 
@@ -845,6 +848,60 @@
         }
     }
 
+    public static class StateDumpRequestFunction extends Function {
+        private static final long serialVersionUID = 1L;
+
+        private final String stateDumpId;
+
+        public StateDumpRequestFunction(String stateDumpId) {
+            this.stateDumpId = stateDumpId;
+        }
+
+        public String getStateDumpId() {
+            return stateDumpId;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.STATE_DUMP_REQUEST;
+        }
+
+    }
+
+    public static class StateDumpResponseFunction extends Function {
+
+        private static final long serialVersionUID = 1L;
+
+        private final String nodeId;
+
+        private final String stateDumpId;
+
+        private final String state;
+
+        public StateDumpResponseFunction(String nodeId, String stateDumpId, String state) {
+            this.nodeId = nodeId;
+            this.stateDumpId = stateDumpId;
+            this.state = state;
+        }
+
+        public String getNodeId() {
+            return nodeId;
+        }
+
+        public String getStateDumpId() {
+            return stateDumpId;
+        }
+
+        public String getState() {
+            return state;
+        }
+
+        @Override
+        public FunctionId getFunctionId() {
+            return FunctionId.STATE_DUMP_RESPONSE;
+        }
+    }
+
     public static class SerializerDeserializer implements IPayloadSerializerDeserializer {
         private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 0a16087..b52d0ae 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -138,4 +138,11 @@
         ipcHandle.send(-1, new CCNCFunctions.GetNodeControllersInfoFunction(), null);
     }
 
+    @Override
+    public void notifyStateDump(String nodeId, String stateDumpId, String state) throws Exception {
+        CCNCFunctions.StateDumpResponseFunction fn = new CCNCFunctions.StateDumpResponseFunction(nodeId, stateDumpId,
+                state);
+        ipcHandle.send(-1, fn, null);
+    }
+
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index e078a91..2484a98 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -78,4 +78,10 @@
         CCNCFunctions.UnDeployBinaryFunction rpaf = new CCNCFunctions.UnDeployBinaryFunction(deploymentId);
         ipcHandle.send(-1, rpaf, null);
     }
+
+    @Override
+    public void dumpState(String stateDumpId) throws Exception {
+        CCNCFunctions.StateDumpRequestFunction dsf = new CCNCFunctions.StateDumpRequestFunction(stateDumpId);
+        ipcHandle.send(-1, dsf, null);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
index fce7180..8c88627 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NCDriver.java
@@ -19,7 +19,6 @@
 
 import org.kohsuke.args4j.CmdLineParser;
 
-import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 
 public class NCDriver {
@@ -39,9 +38,9 @@
 
             final NodeControllerService nService = new NodeControllerService(ncConfig);
             if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.severe("Setting uncaught exception handler " + LifeCycleComponentManager.INSTANCE);
+                LOGGER.severe("Setting uncaught exception handler " + nService.getLifeCycleComponentManager());
             }
-            Thread.currentThread().setUncaughtExceptionHandler(LifeCycleComponentManager.INSTANCE);
+            Thread.currentThread().setUncaughtExceptionHandler(nService.getLifeCycleComponentManager());
             nService.start();
             Runtime.getRuntime().addShutdownHook(new Thread() {
                 @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 245d988..60c4fba 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -50,6 +50,8 @@
 import edu.uci.ics.hyracks.api.deployment.DeploymentId;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
@@ -59,6 +61,7 @@
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatData;
 import edu.uci.ics.hyracks.control.common.heartbeat.HeartbeatSchema;
 import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions;
+import edu.uci.ics.hyracks.control.common.ipc.CCNCFunctions.StateDumpRequestFunction;
 import edu.uci.ics.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.work.FutureValue;
@@ -76,6 +79,7 @@
 import edu.uci.ics.hyracks.control.nc.work.BuildJobProfilesWork;
 import edu.uci.ics.hyracks.control.nc.work.CleanupJobletWork;
 import edu.uci.ics.hyracks.control.nc.work.DeployBinaryWork;
+import edu.uci.ics.hyracks.control.nc.work.StateDumpWork;
 import edu.uci.ics.hyracks.control.nc.work.ReportPartitionAvailabilityWork;
 import edu.uci.ics.hyracks.control.nc.work.StartTasksWork;
 import edu.uci.ics.hyracks.control.nc.work.UnDeployBinaryWork;
@@ -130,6 +134,8 @@
 
     private INCApplicationEntryPoint ncAppEntryPoint;
 
+    private final ILifeCycleComponentManager lccm;
+
     private final MemoryMXBean memoryMXBean;
 
     private final List<GarbageCollectorMXBean> gcMXBeans;
@@ -160,6 +166,7 @@
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
 
+        lccm = new LifeCycleComponentManager();
         queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
         timer = new Timer(true);
@@ -172,7 +179,7 @@
         osMXBean = ManagementFactory.getOperatingSystemMXBean();
         registrationPending = true;
         getNodeControllerInfosAcceptor = new MutableObject<FutureValue<Map<String, NodeControllerInfo>>>();
-        memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
+        memoryManager = new MemoryManager(Long.MAX_VALUE);
     }
 
     public IHyracksRootContext getRootContext() {
@@ -183,6 +190,10 @@
         return appCtx;
     }
 
+    public ILifeCycleComponentManager getLifeCycleComponentManager() {
+        return lccm;
+    }
+
     private static List<IODeviceHandle> getDevices(String ioDevices) {
         List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
         StringTokenizer tok = new StringTokenizer(ioDevices, ",");
@@ -286,7 +297,7 @@
     }
 
     private void startApplication() throws Exception {
-        appCtx = new NCApplicationContext(serverCtx, ctx, id, memoryManager);
+        appCtx = new NCApplicationContext(serverCtx, ctx, id, memoryManager, lccm);
         String className = ncConfig.appNCMainClass;
         if (className != null) {
             Class<?> c = Class.forName(className);
@@ -462,7 +473,8 @@
 
     private final class NodeControllerIPCI implements IIPCI {
         @Override
-        public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+        public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
+                Exception exception) {
             CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
                 case SEND_APPLICATION_MESSAGE: {
@@ -522,6 +534,12 @@
                     queue.schedule(new UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
                     return;
                 }
+
+                case STATE_DUMP_REQUEST: {
+                    final CCNCFunctions.StateDumpRequestFunction dsrf = (StateDumpRequestFunction) fn;
+                    queue.schedule(new StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
+                    return;
+                }
             }
             throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId());
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
index 4b8eb53..a6c2cc4 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/application/NCApplicationContext.java
@@ -15,27 +15,45 @@
 package edu.uci.ics.hyracks.control.nc.application;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.IStateDumpHandler;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import edu.uci.ics.hyracks.api.resources.memory.IMemoryManager;
 import edu.uci.ics.hyracks.control.common.application.ApplicationContext;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.nc.resources.memory.MemoryManager;
 
 public class NCApplicationContext extends ApplicationContext implements INCApplicationContext {
+    private final ILifeCycleComponentManager lccm;
     private final String nodeId;
     private final IHyracksRootContext rootCtx;
     private final MemoryManager memoryManager;
     private Object appObject;
+    private IStateDumpHandler sdh;
 
     public NCApplicationContext(ServerContext serverCtx, IHyracksRootContext rootCtx, String nodeId,
-            MemoryManager memoryManager) throws IOException {
+            MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager) throws IOException {
         super(serverCtx);
+        this.lccm = lifeCyclecomponentManager;
         this.nodeId = nodeId;
         this.rootCtx = rootCtx;
         this.memoryManager = memoryManager;
+        sdh = new IStateDumpHandler() {
+
+            @Override
+            public void dumpState(OutputStream os) throws IOException {
+                lccm.dumpState(os);
+            }
+        };
+    }
+
+    @Override
+    public ILifeCycleComponentManager getLifeCycleComponentManager() {
+        return lccm;
     }
 
     @Override
@@ -48,6 +66,15 @@
     }
 
     @Override
+    public void setStateDumpHandler(IStateDumpHandler handler) {
+        this.sdh = handler;
+    }
+
+    public IStateDumpHandler getStateDumpHandler() {
+        return sdh;
+    }
+
+    @Override
     public IHyracksRootContext getRootContext() {
         return rootCtx;
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 15db1fe..0e63485 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -139,7 +139,7 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("open(" + pid + " by " + taId);
         }
-        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString().replace(":", "$"));
         handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
                 IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
         size = 0;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StateDumpWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StateDumpWork.java
new file mode 100644
index 0000000..0adea86
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StateDumpWork.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.hyracks.control.nc.work;
+
+import java.io.ByteArrayOutputStream;
+
+import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class StateDumpWork extends SynchronizableWork {
+    private final NodeControllerService ncs;
+
+    private final String stateDumpId;
+
+    public StateDumpWork(NodeControllerService ncs, String stateDumpId) {
+        this.ncs = ncs;
+        this.stateDumpId = stateDumpId;
+    }
+
+    @Override
+    protected void doRun() throws Exception {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ncs.getApplicationContext().getStateDumpHandler().dumpState(baos);
+        ncs.getClusterController().notifyStateDump(ncs.getApplicationContext().getNodeId(), stateDumpId,
+                baos.toString("UTF-8"));
+    }
+}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index 5ac2961..d9b4984 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -101,8 +101,8 @@
     protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
     protected final static String sep = System.getProperty("file.separator");
     protected final String dateString = simpleDateFormat.format(new Date());
-    protected final String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
-    protected final String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
+    protected final String primaryFileName = "primaryBtree" + dateString;
+    protected final String btreeFileName = "invIndexBtree" + dateString;
 
     protected IFileSplitProvider primaryFileSplitProvider = new ConstantFileSplitProvider(
             new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
@@ -194,7 +194,7 @@
 
     private IOperatorDescriptor createFileScanOp(JobSpecification spec) {
         FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/cleanednumbereddblptitles.txt"))) };
+                "data" + File.separator + "cleanednumbereddblptitles.txt"))) };
         IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
         RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index 39fa19b..7356644 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -80,7 +80,7 @@
         ccConfig.clusterNetIpAddress = "127.0.0.1";
         ccConfig.clusterNetPort = 39001;
         ccConfig.profileDumpPeriod = 10000;
-        File outDir = new File("target/ClusterController");
+        File outDir = new File("target" + File.separator + "ClusterController");
         outDir.mkdirs();
         File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
         ccRoot.delete();
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 22ff84e..ddaa7cf 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -77,7 +77,7 @@
         ccConfig.clusterNetIpAddress = "127.0.0.1";
         ccConfig.clusterNetPort = 39001;
         ccConfig.profileDumpPeriod = 10000;
-        File outDir = new File("target/ClusterController");
+        File outDir = new File("target" + File.separator + "ClusterController");
         outDir.mkdirs();
         File ccRoot = File.createTempFile(AbstractMultiNCIntegrationTest.class.getName(), ".data", outDir);
         ccRoot.delete();
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index 052ea67..8543433 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -215,7 +215,7 @@
         }
     }
 
-    private void dumpState(OutputStream os) throws IOException {
+    public void dumpState(OutputStream os) throws IOException {
         StringBuilder sb = new StringBuilder();
 
         sb.append(String.format("Memory budget = %d\n", memoryBudget));
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index e191fd0..517660b 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -163,12 +163,11 @@
             throw new HyracksDataException("Failed to destroy the index since it is activated.");
         }
 
-        file.delete();
         if (fileId == -1) {
             return;
         }
-
         bufferCache.deleteFile(fileId, false);
+        file.delete();
         fileId = -1;
     }
 
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 443ad2b..8420c73 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -344,4 +344,4 @@
     public ILSMOperationTracker getOperationTracker() {
         return opTracker;
     }
-}
\ No newline at end of file
+}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 4162dc2..03d57f5 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -802,8 +802,13 @@
     @Override
     public void stop(boolean dumpState, OutputStream os) throws IOException {
         if (dumpState) {
-            os.write(dumpState().getBytes());
+            dumpState(os);
         }
         close();
     }
+
+    @Override
+    public void dumpState(OutputStream os) throws IOException {
+        os.write(dumpState().getBytes());
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
index 9b77e23..dd2455e 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestNCApplicationContext.java
@@ -18,12 +18,16 @@
 import java.util.concurrent.ThreadFactory;
 
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.IStateDumpHandler;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
 import edu.uci.ics.hyracks.api.job.IJobSerializerDeserializerContainer;
+import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponentManager;
+import edu.uci.ics.hyracks.api.lifecycle.LifeCycleComponentManager;
 import edu.uci.ics.hyracks.api.messages.IMessageBroker;
 import edu.uci.ics.hyracks.api.resources.memory.IMemoryManager;
 
 public class TestNCApplicationContext implements INCApplicationContext {
+    private final ILifeCycleComponentManager lccm;
     private final IHyracksRootContext rootCtx;
     private final String nodeId;
 
@@ -33,6 +37,7 @@
     private final IMemoryManager mm;
 
     public TestNCApplicationContext(IHyracksRootContext rootCtx, String nodeId) {
+        this.lccm = new LifeCycleComponentManager();
         this.rootCtx = rootCtx;
         this.nodeId = nodeId;
         mm = new IMemoryManager() {
@@ -85,8 +90,6 @@
 
     @Override
     public void setMessageBroker(IMessageBroker staticticsConnector) {
-        // TODO Auto-generated method stub
-
     }
 
     @Override
@@ -94,11 +97,10 @@
         return null;
     }
 
-	@Override
-	public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
-		// TODO Auto-generated method stub
-		return null;
-	}
+    @Override
+    public IJobSerializerDeserializerContainer getJobSerializerDeserializerContainer() {
+        return null;
+    }
 
     @Override
     public IMemoryManager getMemoryManager() {
@@ -107,13 +109,19 @@
 
     @Override
     public ThreadFactory getThreadFactory() {
-        // TODO Auto-generated method stub
         return null;
     }
 
     @Override
     public void setThreadFactory(ThreadFactory threadFactory) {
-        // TODO Auto-generated method stub
+    }
 
+    @Override
+    public ILifeCycleComponentManager getLifeCycleComponentManager() {
+        return lccm;
+    }
+
+    @Override
+    public void setStateDumpHandler(IStateDumpHandler handler) {
     }
 }
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index a42b411..b2ac4a5 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -610,6 +610,10 @@
      * Terminate the current partition where the current vertex stays in.
      * This will immediately take effect and the upcoming vertice in the
      * same partition cannot be processed.
+     * <<<<<<< HEAD
+     * 
+     =======
+     * >>>>>>> master
      */
     protected final void terminatePartition() {
         voteToHalt();
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/NoBudgetIndexLifecycleManager.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/NoBudgetIndexLifecycleManager.java
index 5484f78..0c97230 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/NoBudgetIndexLifecycleManager.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/NoBudgetIndexLifecycleManager.java
@@ -178,7 +178,7 @@
         }
     }
 
-    private void dumpState(OutputStream os) throws IOException {
+    public void dumpState(OutputStream os) throws IOException {
         StringBuilder sb = new StringBuilder();
 
         String headerFormat = "%-20s %-10s %-20s %-20s %-20s\n";
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
index 638011b..3fe13b1 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/lib/io/SizeEstimationTest.java
@@ -173,4 +173,4 @@
         }
     }
 
-}
+}
\ No newline at end of file
