Merge branch 'master' into genomix/fullstack_genomix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index b8bdf3e..ba51eff 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -27,7 +27,6 @@
     INNERJOIN,
     LEFTOUTERJOIN,
     LIMIT,
-    DIE,
     NESTEDTUPLESOURCE,
     ORDER,
     PROJECT,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 0efb5ff..c3a59ea 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -33,7 +33,6 @@
     SPLIT,
     STABLE_SORT,
     STREAM_LIMIT,
-    STREAM_DIE,
     STREAM_SELECT,
     STREAM_PROJECT,
     STRING_STREAM_SCRIPT,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index 2f53f9b..0c105d0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -20,10 +20,11 @@
     // private ArrayList<AggregateFunctionCallExpression> expressions;
     // TODO type safe list of expressions
     private List<Mutable<ILogicalExpression>> mergeExpressions;
-    private LogicalVariable partitioningVariable;
+    private boolean global;
 
     public AggregateOperator(List<LogicalVariable> variables, List<Mutable<ILogicalExpression>> expressions) {
         super(variables, expressions);
+        global = true;
     }
 
     @Override
@@ -69,12 +70,12 @@
         return mergeExpressions;
     }
 
-    public void setPartitioningVariable(LogicalVariable partitioningVariable) {
-        this.partitioningVariable = partitioningVariable;
+    public void setGlobal(boolean global) {
+        this.global = global;
     }
 
-    public LogicalVariable getPartitioningVariable() {
-        return partitioningVariable;
+    public boolean isGlobal() {
+        return global;
     }
 
     @Override
@@ -90,4 +91,5 @@
         }
         return env;
     }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
deleted file mode 100644
index 03bfcba..0000000
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/DieOperator.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-
-public class DieOperator extends AbstractLogicalOperator {
-
-    private final Mutable<ILogicalExpression> afterObjects; // mandatory
-
-    public DieOperator(ILogicalExpression maxObjectsExpr) {
-        this.afterObjects = new MutableObject<ILogicalExpression>(maxObjectsExpr);
-    }
-
-    public Mutable<ILogicalExpression> getAfterObjects() {
-        return afterObjects;
-    }
-
-    @Override
-    public void recomputeSchema() {
-        schema = new ArrayList<LogicalVariable>();
-        schema.addAll(inputs.get(0).getValue().getSchema());
-    }
-
-    @Override
-    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
-        return visitor.visitDieOperator(this, arg);
-    }
-
-    @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
-        boolean b = false;
-        if (visitor.transform(afterObjects)) {
-            b = true;
-        }
-        return b;
-    }
-
-    @Override
-    public LogicalOperatorTag getOperatorTag() {
-        return LogicalOperatorTag.DIE;
-    }
-
-    @Override
-    public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
-    }
-
-    @Override
-    public boolean isMap() {
-        return true;
-    }
-
-    @Override
-    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        return createPropagatingAllInputsTypeEnvironment(ctx);
-    }
-
-    @Override
-    public boolean requiresVariableReferenceExpressions() {
-        return false;
-    }
-}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 1b4be1e..527a11c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -45,7 +45,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -357,12 +356,6 @@
     }
 
     @Override
-    public Void visitDieOperator(DieOperator op, IOptimizationContext ctx) throws AlgebricksException {
-        propagateFDsAndEquivClasses(op, ctx);
-        return null;
-    }
-
-    @Override
     public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) op.getDataSourceReference().getValue();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index ac6d887..745b8c6 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -36,7 +36,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -180,16 +179,6 @@
     }
 
     @Override
-    public Boolean visitDieOperator(DieOperator op, ILogicalOperator arg) throws AlgebricksException {
-        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
-        if (aop.getOperatorTag() != LogicalOperatorTag.DIE)
-            return Boolean.FALSE;
-        DieOperator dieOpArg = (DieOperator) copyAndSubstituteVar(op, arg);
-        boolean isomorphic = op.getAfterObjects().getValue().equals(dieOpArg.getAfterObjects().getValue());
-        return isomorphic;
-    }
-
-    @Override
     public Boolean visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
         if (aop.getOperatorTag() != LogicalOperatorTag.INNERJOIN)
@@ -643,11 +632,6 @@
         }
 
         @Override
-        public ILogicalOperator visitDieOperator(DieOperator op, Void arg) throws AlgebricksException {
-            return new DieOperator(deepCopyExpressionRef(op.getAfterObjects()).getValue());
-        }
-
-        @Override
         public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
             return new InnerJoinOperator(deepCopyExpressionRef(op.getCondition()), op.getInputs().get(0), op
                     .getInputs().get(1));
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index b9544c7..ac8cabe 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -35,7 +35,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -109,12 +108,6 @@
     }
 
     @Override
-    public Void visitDieOperator(DieOperator op, ILogicalOperator arg) throws AlgebricksException {
-        mapVariablesStandard(op, arg);
-        return null;
-    }
-
-    @Override
     public Void visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8f1d686..e4fd78e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -27,7 +27,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -142,12 +141,6 @@
     }
 
     @Override
-    public Void visitDieOperator(DieOperator op, IOptimizationContext arg) throws AlgebricksException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
     public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, IOptimizationContext arg)
             throws AlgebricksException {
         // TODO Auto-generated method stub
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 31adcba..91ffe4b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -30,7 +30,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

@@ -136,11 +135,6 @@
     }

 

     @Override

-    public Void visitDieOperator(DieOperator op, Void arg) throws AlgebricksException {

-        return null;

-    }

-

-    @Override

     public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {

         return null;

     }

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index cd0cee3..0de9652 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -29,7 +29,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

@@ -152,12 +151,6 @@
     }

 

     @Override

-    public Void visitDieOperator(DieOperator op, Void arg) throws AlgebricksException {

-        standardLayout(op);

-        return null;

-    }

-

-    @Override

     public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {

         VariableUtilities.getLiveVariables(op.getSourceOperator(), schemaVariables);

         return null;

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 69fb3f8..bb9b600 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -31,7 +31,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

@@ -185,14 +184,6 @@
     }

 

     @Override

-    public Void visitDieOperator(DieOperator op, Pair<LogicalVariable, LogicalVariable> pair)

-            throws AlgebricksException {

-        op.getAfterObjects().getValue().substituteVar(pair.first, pair.second);

-        substVarTypes(op, pair);

-        return null;

-    }

-

-    @Override

     public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Pair<LogicalVariable, LogicalVariable> pair)

             throws AlgebricksException {

         return null;

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 5361a19..42f7e20 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -30,7 +30,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;

-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;

 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;

@@ -78,9 +77,6 @@
         for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {

             exprRef.getValue().getUsedVariables(usedVariables);

         }

-        if (op.getPartitioningVariable() != null) {

-            usedVariables.add(op.getPartitioningVariable());

-        }

         return null;

     }

 

@@ -198,12 +194,6 @@
     }

 

     @Override

-    public Void visitDieOperator(DieOperator op, Void arg) {

-        op.getAfterObjects().getValue().getUsedVariables(usedVariables);

-        return null;

-    }

-

-    @Override

     public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) {

         // does not use any variable

         return null;

diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
index 81dc2c2..f47c2ec 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AggregatePOperator.java
@@ -16,12 +16,10 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -31,15 +29,14 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
@@ -58,25 +55,29 @@
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        IPhysicalPropertiesVector childProps = op2.getDeliveredPhysicalProperties();
-        deliveredProperties = new StructuralPropertiesVector(childProps.getPartitioningProperty(),
-                new ArrayList<ILocalStructuralProperty>(0));
+        AggregateOperator aggOp = (AggregateOperator) op;
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        if (aggOp.getExecutionMode() != AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+            deliveredProperties = new StructuralPropertiesVector(op2.getDeliveredPhysicalProperties()
+                    .getPartitioningProperty(), new ArrayList<ILocalStructuralProperty>());
+        } else {
+            deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED,
+                    new ArrayList<ILocalStructuralProperty>());
+        }
     }
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
         AggregateOperator aggOp = (AggregateOperator) op;
-        if (aggOp.getExecutionMode() == ExecutionMode.PARTITIONED && aggOp.getPartitioningVariable() != null) {
-            StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
-            Set<LogicalVariable> partitioningVariables = new ListSet<LogicalVariable>();
-            partitioningVariables.add(aggOp.getPartitioningVariable());
-            pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(partitioningVariables, null), null);
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+        if (aggOp.isGlobal() && aggOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+            pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
         } else {
             return emptyUnaryRequirements();
         }
+
     }
 
     @Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
index 302d4d2..9a96319 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java
@@ -100,10 +100,9 @@
                 context, columns);
 
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime(
-                resultOp.getDataSink(), columns, pf, inputDesc, false, spec);
+                resultOp.getDataSink(), columns, pf, inputDesc, true, spec);
 
         builder.contributeHyracksOperator(resultOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
         ILogicalOperator src = resultOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, resultOp, 0);
     }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java
deleted file mode 100644
index 73b9b79..0000000
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamDiePOperator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.std.StreamDieRuntimeFactory;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class StreamDiePOperator extends AbstractPhysicalOperator {
-
-    public StreamDiePOperator() {
-    }
-
-    @Override
-    public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.STREAM_DIE;
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return true;
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        ILogicalOperator op2 = op.getInputs().get(0).getValue();
-        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
-    }
-
-    @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
-        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
-        pv[0] = (StructuralPropertiesVector) reqdByParent;
-        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @Override
-    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        DieOperator die = (DieOperator) op;
-        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
-        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        IScalarEvaluatorFactory afterObjectsFact = expressionRuntimeProvider.createEvaluatorFactory(die
-                .getAfterObjects().getValue(), env, inputSchemas, context);
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
-        StreamDieRuntimeFactory runtime = new StreamDieRuntimeFactory(afterObjectsFact, null,
-                context.getBinaryIntegerInspectorFactory());
-        builder.contributeMicroOperator(die, runtime, recDesc);
-        ILogicalOperator src = die.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, die, 0);
-    }
-
-}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index 11e24d7..531b300 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -37,10 +38,8 @@
 
 public class StreamLimitPOperator extends AbstractPhysicalOperator {
 
-    private boolean global;
+    public StreamLimitPOperator() {
 
-    public StreamLimitPOperator(boolean global) {
-        this.global = global;
     }
 
     @Override
@@ -55,14 +54,22 @@
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
         ILogicalOperator op2 = op.getInputs().get(0).getValue();
-        deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+        if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
+            //partitioning property: unpartitioned;  local property: whatever from the child
+            deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, op2
+                    .getDeliveredPhysicalProperties().getLocalProperties());
+        } else {
+            deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
+        }
     }
 
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
-        if (global) {
+        AbstractLogicalOperator limitOp = (AbstractLogicalOperator) op;
+        if (limitOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
             StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
             pv[0] = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, null);
             return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
@@ -83,7 +90,8 @@
         ILogicalExpression offsetExpr = limit.getOffset().getValue();
         IScalarEvaluatorFactory offsetFact = (offsetExpr == null) ? null : expressionRuntimeProvider
                 .createEvaluatorFactory(offsetExpr, env, inputSchemas, context);
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
         StreamLimitRuntimeFactory runtime = new StreamLimitRuntimeFactory(maxObjectsFact, offsetFact, null,
                 context.getBinaryIntegerInspectorFactory());
         builder.contributeMicroOperator(limit, runtime, recDesc);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
index 54c0505..7861dc0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamProjectPOperator.java
@@ -31,6 +31,8 @@
 
 public class StreamProjectPOperator extends AbstractPropagatePropertiesForUsedVariablesPOperator {
 
+    private boolean flushFramesRapidly;
+
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.STREAM_PROJECT;
@@ -61,8 +63,9 @@
             }
             projectionList[i++] = pos;
         }
-        StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList);
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        StreamProjectRuntimeFactory runtime = new StreamProjectRuntimeFactory(projectionList, flushFramesRapidly);
+        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
+                context);
         builder.contributeMicroOperator(project, runtime, recDesc);
         ILogicalOperator src = project.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, project, 0);
@@ -74,4 +77,8 @@
         computeDeliveredPropertiesForUsedVariables(p, p.getVariables());
     }
 
+    public void setRapidFrameFlush(boolean flushFramesRapidly) {
+        this.flushFramesRapidly = flushFramesRapidly;
+    }
+
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index fc0c433..4b7597c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -28,7 +28,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -256,13 +255,6 @@
     }
 
     @Override
-    public String visitDieOperator(DieOperator op, Integer indent) {
-        StringBuilder buffer = new StringBuilder();
-        addIndent(buffer, indent).append("die after " + op.getAfterObjects().getValue());
-        return buffer.toString();
-    }
-
-    @Override
     public String visitExchangeOperator(ExchangeOperator op, Integer indent) {
         StringBuilder buffer = new StringBuilder();
         addIndent(buffer, indent).append("exchange ");
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
similarity index 68%
rename from hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
rename to algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
index 8f5ed64..ca15346 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/properties/ResultSetDomain.java
@@ -3,19 +3,25 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
- *
+ * 
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.hyracks.api.dataset;
+package edu.uci.ics.hyracks.algebricks.core.algebra.properties;
 
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+public class ResultSetDomain implements INodeDomain {
+    @Override
+    public boolean sameAs(INodeDomain domain) {
+        return true;
+    }
 
-public interface IDatasetPartitionReader {
-    public void writeTo(IFrameWriter writer);
+    @Override
+    public Integer cardinality() {
+        return 0;
+    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 47a979c..35b36dc 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
@@ -96,14 +97,21 @@
                 break;
             }
             default: {
+                boolean forceUnpartitioned = false;
                 if (op.getOperatorTag() == LogicalOperatorTag.LIMIT) {
                     LimitOperator opLim = (LimitOperator) op;
                     if (opLim.isTopmostLimitOp()) {
-                        if (opLim.getExecutionMode() != AbstractLogicalOperator.ExecutionMode.UNPARTITIONED) {
-                            opLim.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
-                            change = true;
-                        }
-                        break;
+                        opLim.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+                        change = true;
+                        forceUnpartitioned = true;
+                    }
+                }
+                if (op.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                    AggregateOperator aggOp = (AggregateOperator) op;
+                    if (aggOp.isGlobal()) {
+                        op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED);
+                        change = true;
+                        forceUnpartitioned = true;
                     }
                 }
 
@@ -112,6 +120,8 @@
                     AbstractLogicalOperator inputOp = (AbstractLogicalOperator) i.getValue();
                     switch (inputOp.getExecutionMode()) {
                         case PARTITIONED: {
+                            if (forceUnpartitioned)
+                                break;
                             op.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
                             change = true;
                             exit = true;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 23dac2a..32b049e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -18,7 +18,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DieOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
@@ -106,5 +105,4 @@
 
     public R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T tag) throws AlgebricksException;
 
-    public R visitDieOperator(DieOperator op, T arg) throws AlgebricksException;
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
index 08271c1..013ddda 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/AbstractIntroduceCombinerRule.java
@@ -16,12 +16,10 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -110,16 +108,10 @@
             } else {
                 // The local aggregate operator is fed by the input of the original aggregate operator.
                 pushedAgg.getInputs().add(new MutableObject<ILogicalOperator>(initAgg.getInputs().get(0).getValue()));
-                // Set the partitioning variable in the local agg to ensure it is not projected away.
-                context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
-                LogicalVariable trueVar = context.newVar();
                 // Reintroduce assign op for the global agg partitioning var.
-                AssignOperator trueAssignOp = new AssignOperator(trueVar, new MutableObject<ILogicalExpression>(
-                        ConstantExpression.TRUE));
-                trueAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(pushedAgg));
-                context.computeAndSetTypeEnvironmentForOperator(trueAssignOp);
-                initAgg.setPartitioningVariable(trueVar);
-                initAgg.getInputs().get(0).setValue(trueAssignOp);
+                initAgg.getInputs().get(0).setValue(pushedAgg);
+                pushedAgg.setGlobal(false);
+                context.computeAndSetTypeEnvironmentForOperator(pushedAgg);
             }
             return new Pair<Boolean, Mutable<ILogicalOperator>>(true, new MutableObject<ILogicalOperator>(pushedAgg));
         } else {
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index 3260ca0..7bc150a 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -51,6 +51,7 @@
         orderBreakingOps.add(LogicalOperatorTag.INNERJOIN);
         orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN);
         orderBreakingOps.add(LogicalOperatorTag.UNIONALL);
+        orderBreakingOps.add(LogicalOperatorTag.AGGREGATE);
     }
 
     @Override
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
index c3d935c..c75db57 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroduceAggregateCombinerRule.java
@@ -29,7 +29,7 @@
             return false;
         }
         AggregateOperator aggOp = (AggregateOperator) op;
-        if (aggOp.getExecutionMode() != ExecutionMode.PARTITIONED || aggOp.getPartitioningVariable() == null) {
+        if (!aggOp.isGlobal() || aggOp.getExecutionMode() == ExecutionMode.LOCAL) {
             return false;
         }
         Map<AggregateFunctionCallExpression, SimilarAggregatesInfo> toReplaceMap = new HashMap<AggregateFunctionCallExpression, SimilarAggregatesInfo>();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushDieUpRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushDieUpRule.java
deleted file mode 100644
index c4dd78d..0000000
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushDieUpRule.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package edu.uci.ics.hyracks.algebricks.rewriter.rules;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class PushDieUpRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
-        if (op0.getInputs().size() == 0)
-            return false;
-        AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
-
-        if (op1.getInputs().size() == 0)
-            return false;
-        LogicalOperatorTag tag = op1.getOperatorTag();
-        if (tag == LogicalOperatorTag.SINK || tag == LogicalOperatorTag.WRITE
-                || tag == LogicalOperatorTag.INSERT_DELETE || tag == LogicalOperatorTag.WRITE_RESULT)
-            return false;
-
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
-        if (op2.getOperatorTag() == LogicalOperatorTag.DIE) {
-            op0.getInputs().get(0).setValue(op2);
-            op1.getInputs().clear();
-            for (Mutable<ILogicalOperator> ref : op2.getInputs())
-                op1.getInputs().add(ref);
-            op2.getInputs().clear();
-            op2.getInputs().add(new MutableObject<ILogicalOperator>(op1));
-
-            context.computeAndSetTypeEnvironmentForOperator(op0);
-            context.computeAndSetTypeEnvironmentForOperator(op1);
-            context.computeAndSetTypeEnvironmentForOperator(op2);
-            return true;
-        } else {
-            return false;
-        }
-    }
-}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
index acfdbd3..e5f60bc 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
@@ -106,7 +106,7 @@
                     opLim.getMaxObjects(), opLim.getOffset());
             clone2 = new LimitOperator(maxPlusOffset, false);
         }
-        clone2.setPhysicalOperator(new StreamLimitPOperator(false));
+        clone2.setPhysicalOperator(new StreamLimitPOperator());
         clone2.getInputs().add(new MutableObject<ILogicalOperator>(op2));
         clone2.setExecutionMode(op2.getExecutionMode());
         clone2.recomputeSchema();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 60a4fbb..9c8ad46 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -21,7 +21,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -53,7 +52,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SinkWritePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamDiePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamSelectPOperator;
@@ -173,9 +171,7 @@
                     break;
                 }
                 case LIMIT: {
-                    LimitOperator opLim = (LimitOperator) op;
-                    op.setPhysicalOperator(new StreamLimitPOperator(opLim.isTopmostLimitOp()
-                            && opLim.getExecutionMode() == ExecutionMode.PARTITIONED));
+                    op.setPhysicalOperator(new StreamLimitPOperator());
                     break;
                 }
                 case NESTEDTUPLESOURCE: {
@@ -280,10 +276,6 @@
                     op.setPhysicalOperator(new SinkPOperator());
                     break;
                 }
-                case DIE: {
-                    op.setPhysicalOperator(new StreamDiePOperator());
-                    break;
-                }
             }
         }
         if (op.hasNestedPlans()) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index cd9931b..79e964b 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -54,6 +54,11 @@
     }
 
     protected void appendProjectionToFrame(int tIndex, int[] projectionList) throws HyracksDataException {
+        appendProjectionToFrame(tIndex, projectionList, false);
+    }
+
+    protected void appendProjectionToFrame(int tIndex, int[] projectionList, boolean flushFrame)
+            throws HyracksDataException {
         if (!appender.appendProjection(tAccess, tIndex, projectionList)) {
             FrameUtils.flushFrame(frame, writer);
             appender.reset(frame, true);
@@ -61,6 +66,11 @@
                 throw new IllegalStateException(
                         "Could not write frame (AbstractOneInputOneOutputOneFramePushRuntime.appendProjectionToFrame).");
             }
+            return;
+        }
+        if (flushFrame) {
+            FrameUtils.flushFrame(frame, writer);
+            appender.reset(frame, true);
         }
     }
 
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
deleted file mode 100644
index 508ce5d..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamDieRuntimeFactory.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package edu.uci.ics.hyracks.algebricks.runtime.operators.std;
-
-import java.nio.ByteBuffer;
-
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspector;
-import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.api.IPointable;
-import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
-
-public class StreamDieRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
-
-    private static final long serialVersionUID = 1L;
-
-    private IScalarEvaluatorFactory aftterObjectsEvalFactory;
-    private IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory;
-
-    public StreamDieRuntimeFactory(IScalarEvaluatorFactory maxObjectsEvalFactory, int[] projectionList,
-            IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory) {
-        super(projectionList);
-        this.aftterObjectsEvalFactory = maxObjectsEvalFactory;
-        this.binaryIntegerInspectorFactory = binaryIntegerInspectorFactory;
-    }
-
-    @Override
-    public String toString() {
-        String s = "stream-die " + aftterObjectsEvalFactory.toString();
-        return s;
-    }
-
-    @Override
-    public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx) {
-        final IBinaryIntegerInspector bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx);
-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
-            private IPointable p = VoidPointable.FACTORY.createPointable();
-            private IScalarEvaluator evalAfterObjects;
-            private int toWrite = -1;
-
-            @Override
-            public void open() throws HyracksDataException {
-                if (evalAfterObjects == null) {
-                    initAccessAppendRef(ctx);
-                    try {
-                        evalAfterObjects = aftterObjectsEvalFactory.createScalarEvaluator(ctx);
-                    } catch (AlgebricksException ae) {
-                        throw new HyracksDataException(ae);
-                    }
-                }
-                writer.open();
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
-                if (toWrite < 0) {
-                    toWrite = evaluateInteger(evalAfterObjects, 0);
-                }
-                for (int t = 0; t < nTuple; t++) {
-                    if (toWrite > 0) {
-                        toWrite--;
-                        if (projectionList != null) {
-                            appendProjectionToFrame(t, projectionList);
-                        } else {
-                            appendTupleToFrame(t);
-                        }
-                    } else {
-                        throw new HyracksDataException("injected failure");
-//                    	System.out.println("Injected Kill-JVM");
-//                    	System.exit(-1);
-                    }
-                }
-            }
-
-            @Override
-            public void close() throws HyracksDataException {
-                super.close();
-            }
-
-            private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
-                tRef.reset(tAccess, tIdx);
-                try {
-                    eval.evaluate(tRef, p);
-                } catch (AlgebricksException ae) {
-                    throw new HyracksDataException(ae);
-                }
-                int lim = bii.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
-                return lim;
-            }
-
-        };
-    }
-
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 455ad8e..0e6fc15 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -26,9 +26,16 @@
 public class StreamProjectRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
+    private final boolean flushFramesRapidly;
+
+    public StreamProjectRuntimeFactory(int[] projectionList, boolean flushFramesRapidly) {
+        super(projectionList);
+        this.flushFramesRapidly = flushFramesRapidly;
+    }
 
     public StreamProjectRuntimeFactory(int[] projectionList) {
         super(projectionList);
+        this.flushFramesRapidly = false;
     }
 
     @Override
@@ -57,7 +64,16 @@
             public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
                 tAccess.reset(buffer);
                 int nTuple = tAccess.getTupleCount();
-                for (int t = 0; t < nTuple; t++) {
+
+                int t = 0;
+                for (; t < nTuple - 1; t++) {
+                    appendProjectionToFrame(t, projectionList);
+                }
+                if (flushFramesRapidly) {
+                    // Whenever all the tuples in the incoming frame have been consumed, the project operator 
+                    // will push its frame to the next operator; i.e., it won't wait until the frame gets full. 
+                    appendProjectionToFrame(t, projectionList, true);
+                } else {
                     appendProjectionToFrame(t, projectionList);
                 }
 
diff --git a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
index 1cc34e1..f42e321 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
+++ b/hivesterix/hivesterix-dist/src/test/resources/runtimefunctionts/logging.properties
@@ -60,6 +60,5 @@
 # For example, set the com.xyz.foo logger to only log SEVERE
 # messages:
 
-edu.uci.ics.asterix.level = WARNING
-edu.uci.ics.algebricks.level = WARNING
+#edu.uci.ics.hyracks.algebricks.level = FINEST
 edu.uci.ics.hyracks.level = WARNING
diff --git a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
index 7e4e271..a049f15 100644
--- a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
+++ b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
@@ -40,9 +40,7 @@
     public final static LinkedList<IAlgebraicRewriteRule> NORMALIZATION = new LinkedList<IAlgebraicRewriteRule>();

     static {

         NORMALIZATION.add(new EliminateSubplanRule());

-        NORMALIZATION.add(new IntroduceAggregateCombinerRule());

         NORMALIZATION.add(new BreakSelectIntoConjunctsRule());

-        NORMALIZATION.add(new IntroduceAggregateCombinerRule());

         NORMALIZATION.add(new PushSelectIntoJoinRule());

         NORMALIZATION.add(new ExtractGbyExpressionsRule());

         NORMALIZATION.add(new RemoveRedundantSelectRule());

@@ -84,6 +82,7 @@
         CONSOLIDATION.add(new IntroduceEarlyProjectRule());

         CONSOLIDATION.add(new ConsolidateAssignsRule());

         CONSOLIDATION.add(new IntroduceGroupByCombinerRule());

+        CONSOLIDATION.add(new IntroduceAggregateCombinerRule());

         CONSOLIDATION.add(new RemoveUnusedAssignAndAggregateRule());

     }

 

diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index ae38c7f..dea077d 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -29,7 +29,10 @@
 
     public void reportPartitionFailure(JobId jobId, ResultSetId resultSetId, int partition) throws HyracksException;
 
-    public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc) throws HyracksException;
+    public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition, IFrameWriter noc)
+            throws HyracksException;
+
+    public void abortReader(JobId jobId);
 
     public IWorkspaceFileFactory getFileFactory();
 
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
deleted file mode 100644
index 42dc157..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IDatasetPartitionWriter extends IFrameWriter {
-    public Page returnPage() throws HyracksDataException;
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
index d49d5cd..f29356b 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceConnection.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceConnection {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
index ba21a84..3fe4ada 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetDirectoryServiceInterface.java
@@ -14,7 +14,7 @@
  */
 package edu.uci.ics.hyracks.api.dataset;
 
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.job.JobId;
 
 public interface IHyracksDatasetDirectoryServiceInterface {
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
index b928a49..c397a94 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IHyracksDatasetReader.java
@@ -16,7 +16,7 @@
 
 import java.nio.ByteBuffer;
 
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public interface IHyracksDatasetReader {
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
index 095fd7d..c882448 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceConnection.java
@@ -17,7 +17,7 @@
 import java.net.InetSocketAddress;
 
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
index 47cdf97..9c3b918 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetDirectoryServiceInterfaceRemoteProxy.java
@@ -16,7 +16,7 @@
 
 import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceInterface;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
diff --git a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
index 6419983..33e1b01 100644
--- a/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
+++ b/hyracks/hyracks-client/src/main/java/edu/uci/ics/hyracks/client/dataset/HyracksDatasetReader.java
@@ -26,7 +26,7 @@
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
+import edu.uci.ics.hyracks.api.dataset.DatasetJobRecord.Status;
 import edu.uci.ics.hyracks.api.dataset.IDatasetInputChannelMonitor;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetDirectoryServiceConnection;
 import edu.uci.ics.hyracks.api.dataset.IHyracksDatasetReader;
@@ -101,7 +101,7 @@
                             knownRecords);
                     lastReadPartition = 0;
                     resultChannel = new DatasetNetworkInputChannel(netManager,
-                            getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
+                            getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId, lastReadPartition,
                             NUM_READ_BUFFERS);
                     lastMonitor = getMonitor(lastReadPartition);
                     resultChannel.open(datasetClientCtx);
@@ -118,7 +118,7 @@
 
         while (readSize <= 0 && !(isLastPartitionReadComplete())) {
             synchronized (lastMonitor) {
-                while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached()) {
+                while (lastMonitor.getNFramesAvailable() <= 0 && !lastMonitor.eosReached() && !lastMonitor.failed()) {
                     try {
                         lastMonitor.wait();
                     } catch (InterruptedException e) {
@@ -127,6 +127,9 @@
                 }
             }
 
+            if (lastMonitor.failed()) {
+                throw new HyracksDataException("Job Failed.");
+            }
             if (isPartitionReadComplete(lastMonitor)) {
                 knownRecords[lastReadPartition].readEOS();
                 if ((lastReadPartition == knownRecords.length - 1)) {
@@ -135,23 +138,17 @@
                     try {
                         lastReadPartition++;
                         while (knownRecords[lastReadPartition] == null) {
-                            try {
-                                knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
-                                        resultSetId, knownRecords);
-                            } catch (Exception e) {
-                                // Do nothing here.
-                            }
+                            knownRecords = datasetDirectoryServiceConnection.getDatasetResultLocations(jobId,
+                                    resultSetId, knownRecords);
                         }
 
                         resultChannel = new DatasetNetworkInputChannel(netManager,
-                                getSocketAddress(knownRecords[lastReadPartition]), jobId, lastReadPartition,
-                                NUM_READ_BUFFERS);
+                                getSocketAddress(knownRecords[lastReadPartition]), jobId, resultSetId,
+                                lastReadPartition, NUM_READ_BUFFERS);
                         lastMonitor = getMonitor(lastReadPartition);
                         resultChannel.open(datasetClientCtx);
                         resultChannel.registerMonitor(lastMonitor);
-                    } catch (HyracksException e) {
-                        throw new HyracksDataException(e);
-                    } catch (UnknownHostException e) {
+                    } catch (Exception e) {
                         throw new HyracksDataException(e);
                     }
                 }
diff --git a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
index fac2949..1ab315b 100644
--- a/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ b/hyracks/hyracks-comm/src/main/java/edu/uci/ics/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
@@ -41,6 +42,8 @@
 
     private final JobId jobId;
 
+    private final ResultSetId resultSetId;
+
     private final int partition;
 
     private final Queue<ByteBuffer> fullQueue;
@@ -54,10 +57,11 @@
     private Object attachment;
 
     public DatasetNetworkInputChannel(IChannelConnectionFactory netManager, SocketAddress remoteAddress, JobId jobId,
-            int partition, int nBuffers) {
+            ResultSetId resultSetId, int partition, int nBuffers) {
         this.netManager = netManager;
         this.remoteAddress = remoteAddress;
         this.jobId = jobId;
+        this.resultSetId = resultSetId;
         this.partition = partition;
         fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
         this.nBuffers = nBuffers;
@@ -103,6 +107,7 @@
         }
         ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
         writeBuffer.putLong(jobId.getId());
+        writeBuffer.putLong(resultSetId.getId());
         writeBuffer.putInt(partition);
         writeBuffer.flip();
         if (LOGGER.isLoggable(Level.FINE)) {
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
index e648733..21b05d4 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -99,7 +99,7 @@
         records[partition].writeEOS();
 
         for (DatasetDirectoryRecord record : records) {
-            if (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS) {
+            if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) {
                 successCount++;
             }
         }
@@ -112,14 +112,18 @@
     @Override
     public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
         DatasetJobRecord djr = jobResultLocations.get(jobId);
-        djr.fail();
+        if (djr != null) {
+            djr.fail();
+        }
         notifyAll();
     }
 
     @Override
     public synchronized void reportJobFailure(JobId jobId) {
         DatasetJobRecord djr = jobResultLocations.get(jobId);
-        djr.fail();
+        if (djr != null) {
+            djr.fail();
+        }
         notifyAll();
     }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index cecd677..4e27f12 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -21,12 +21,13 @@
 import java.util.Map;
 import java.util.Set;
 
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
 import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
 public class DatasetMemoryManager {
+    private int availableMemory;
+
     private final Set<Page> availPages;
 
     private final LeastRecentlyUsedList leastRecentlyUsedList;
@@ -36,29 +37,32 @@
     private final static int FRAME_SIZE = 32768;
 
     public DatasetMemoryManager(int availableMemory) {
+        this.availableMemory = availableMemory;
+
         availPages = new HashSet<Page>();
 
         // Atleast have one page for temporarily storing the results.
-        if (availableMemory <= 0)
-            availableMemory = FRAME_SIZE;
-
-        while (availableMemory >= FRAME_SIZE) {
-            /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
-             * instead of direct ByteBuffer.allocate()?
-             */
-            availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
-            availableMemory -= FRAME_SIZE;
-        }
+        if (this.availableMemory <= FRAME_SIZE)
+            this.availableMemory = FRAME_SIZE;
 
         leastRecentlyUsedList = new LeastRecentlyUsedList();
         resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
     }
 
-    public Page requestPage(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter dpw)
-            throws OutOfMemoryError, HyracksDataException {
+    public synchronized Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
+            throws HyracksDataException {
         Page page;
         if (availPages.isEmpty()) {
-            page = evictPage();
+            if (availableMemory >= FRAME_SIZE) {
+                /* TODO(madhusudancs): Should we have some way of accounting this memory usage by using Hyrack's allocateFrame()
+                 * instead of direct ByteBuffer.allocate()?
+                 */
+                availPages.add(new Page(ByteBuffer.allocate(FRAME_SIZE)));
+                availableMemory -= FRAME_SIZE;
+                page = getAvailablePage();
+            } else {
+                page = evictPage();
+            }
         } else {
             page = getAvailablePage();
         }
@@ -71,7 +75,7 @@
          * update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
          * then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
          */
-        PartitionNode pn = updateReference(resultSetPartitionId, dpw);
+        PartitionNode pn = updateReference(resultSetPartitionId, resultState);
         pn.add(page);
         return page;
     }
@@ -81,7 +85,7 @@
         updateReference(resultSetPartitionId, null);
     }
 
-    public int getPageSize() {
+    public static int getPageSize() {
         return FRAME_SIZE;
     }
 
@@ -90,28 +94,29 @@
         resultPartitionNodesMap.put(resultSetPartitionId, pn);
     }
 
-    protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
-            IDatasetPartitionWriter dpw) {
+    protected PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
         PartitionNode pn = null;
 
         if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
-            if (dpw != null) {
-                pn = new PartitionNode(resultSetPartitionId, dpw);
+            if (resultState != null) {
+                pn = new PartitionNode(resultSetPartitionId, resultState);
                 insertPartitionNode(resultSetPartitionId, pn);
             }
             return pn;
         }
-        pn = resultPartitionNodesMap.get(resultSetPartitionId);
-        leastRecentlyUsedList.remove(pn);
-        insertPartitionNode(resultSetPartitionId, pn);
+        synchronized (this) {
+            pn = resultPartitionNodesMap.get(resultSetPartitionId);
+            leastRecentlyUsedList.remove(pn);
+            insertPartitionNode(resultSetPartitionId, pn);
+        }
 
         return pn;
     }
 
-    protected synchronized Page evictPage() throws HyracksDataException {
+    protected Page evictPage() throws HyracksDataException {
         PartitionNode pn = leastRecentlyUsedList.getFirst();
-        IDatasetPartitionWriter dpw = pn.getDatasetPartitionWriter();
-        Page page = dpw.returnPage();
+        ResultState resultState = pn.getResultState();
+        Page page = resultState.returnPage();
 
         /* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
          * away all the pages allocated to it and add to the available pages set.
@@ -140,7 +145,7 @@
         return page;
     }
 
-    protected synchronized Page getAvailablePage() {
+    protected Page getAvailablePage() {
         Iterator<Page> iter = availPages.iterator();
         Page page = iter.next();
         iter.remove();
@@ -197,15 +202,15 @@
 
         private final ResultSetPartitionId resultSetPartitionId;
 
-        private final IDatasetPartitionWriter datasetPartitionWriter;
+        private final ResultState resultState;
 
         private PartitionNode prev;
 
         private PartitionNode next;
 
-        public PartitionNode(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter datasetPartitionWriter) {
+        public PartitionNode(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
             this.resultSetPartitionId = resultSetPartitionId;
-            this.datasetPartitionWriter = datasetPartitionWriter;
+            this.resultState = resultState;
             prev = null;
             next = null;
         }
@@ -214,8 +219,8 @@
             return resultSetPartitionId;
         }
 
-        public IDatasetPartitionWriter getDatasetPartitionWriter() {
-            return datasetPartitionWriter;
+        public ResultState getResultState() {
+            return resultState;
         }
 
         public void setPrev(PartitionNode node) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index af9a607..2ec7acc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -14,14 +14,15 @@
  */
 package edu.uci.ics.hyracks.control.nc.dataset;
 
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
@@ -32,11 +33,13 @@
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
 public class DatasetPartitionManager implements IDatasetPartitionManager {
+    private static final Logger LOGGER = Logger.getLogger(DatasetPartitionManager.class.getName());
+
     private final NodeControllerService ncs;
 
     private final Executor executor;
 
-    private final Map<JobId, ResultState[]> partitionResultStateMap;
+    private final Map<JobId, Map<ResultSetId, ResultState[]>> partitionResultStateMap;
 
     private final DefaultDeallocatableRegistry deallocatableRegistry;
 
@@ -50,18 +53,34 @@
         this.executor = executor;
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
-        datasetMemoryManager = new DatasetMemoryManager(availableMemory);
-        partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
+        if (availableMemory >= DatasetMemoryManager.getPageSize()) {
+            datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+        } else {
+            datasetMemoryManager = null;
+        }
+        partitionResultStateMap = new LinkedHashMap<JobId, Map<ResultSetId, ResultState[]>>() {
             private static final long serialVersionUID = 1L;
 
-            protected boolean removeEldestEntry(Map.Entry<JobId, ResultState[]> eldest) {
-                if (size() > resultHistorySize) {
-                    for (ResultState state : eldest.getValue()) {
-                        state.deinit();
+            protected boolean removeEldestEntry(Map.Entry<JobId, Map<ResultSetId, ResultState[]>> eldest) {
+                synchronized (DatasetPartitionManager.this) {
+                    if (size() > resultHistorySize) {
+                        Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(eldest.getValue());
+                        for (ResultSetId rsId : rsIdMap.keySet()) {
+                            ResultState[] resultStates = rsIdMap.get(rsId);
+                            if (resultStates != null) {
+                                for (int i = 0; i < resultStates.length; i++) {
+                                    ResultState state = resultStates[i];
+                                    if (state != null) {
+                                        state.closeAndDelete();
+                                        LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
+                                    }
+                                }
+                            }
+                        }
+                        return true;
                     }
-                    return true;
+                    return false;
                 }
-                return false;
             }
         };
     }
@@ -72,15 +91,21 @@
         DatasetPartitionWriter dpw = null;
         JobId jobId = ctx.getJobletContext().getJobId();
         try {
-            synchronized (partitionResultStateMap) {
+            synchronized (this) {
                 ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
                         nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
-                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+                dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
 
-                ResultState[] resultStates = partitionResultStateMap.get(jobId);
+                Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+                if (rsIdMap == null) {
+                    rsIdMap = new HashMap<ResultSetId, ResultState[]>();
+                    partitionResultStateMap.put(jobId, rsIdMap);
+                }
+
+                ResultState[] resultStates = rsIdMap.get(rsId);
                 if (resultStates == null) {
                     resultStates = new ResultState[nPartitions];
-                    partitionResultStateMap.put(jobId, resultStates);
+                    rsIdMap.put(rsId, resultStates);
                 }
                 resultStates[partition] = dpw.getResultState();
             }
@@ -88,12 +113,15 @@
             throw new HyracksException(e);
         }
 
+        LOGGER.fine("Initialized partition writer: JobId: " + jobId + ":partition: " + partition);
         return dpw;
     }
 
     @Override
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
         try {
+            LOGGER.fine("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
+                    + ":partition: " + partition);
             ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -103,6 +131,8 @@
     @Override
     public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
         try {
+            LOGGER.info("Reporting partition failure: JobId: " + jobId + ": ResultSetId: " + rsId + ":partition: "
+                    + partition);
             ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
         } catch (Exception e) {
             throw new HyracksException(e);
@@ -110,24 +140,51 @@
     }
 
     @Override
-    public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
-            throws HyracksException {
+    public void initializeDatasetPartitionReader(JobId jobId, ResultSetId resultSetId, int partition,
+            IFrameWriter writer) throws HyracksException {
         ResultState resultState;
-        synchronized (partitionResultStateMap) {
-            ResultState[] resultStates = partitionResultStateMap.get(jobId);
+        synchronized (this) {
+            Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
 
-            if (resultStates == null) {
+            if (rsIdMap == null) {
                 throw new HyracksException("Unknown JobId " + jobId);
             }
 
+            ResultState[] resultStates = rsIdMap.get(resultSetId);
+            if (resultStates == null) {
+                throw new HyracksException("Unknown JobId: " + jobId + " ResultSetId: " + resultSetId);
+            }
+
             resultState = resultStates[partition];
             if (resultState == null) {
                 throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
             }
         }
 
-        IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+        DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
         dpr.writeTo(writer);
+        LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: "
+                + partition);
+    }
+
+    @Override
+    public synchronized void abortReader(JobId jobId) {
+        Map<ResultSetId, ResultState[]> rsIdMap = partitionResultStateMap.get(jobId);
+
+        if (rsIdMap == null) {
+            return;
+        }
+
+        for (ResultSetId rsId : rsIdMap.keySet()) {
+            ResultState[] resultStates = rsIdMap.get(rsId);
+            if (resultStates != null) {
+                for (ResultState state : resultStates) {
+                    if (state != null) {
+                        state.abort();
+                    }
+                }
+            }
+        }
     }
 
     @Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index a584b4b..07624de 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -20,14 +20,10 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
-import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
 
-public class DatasetPartitionReader implements IDatasetPartitionReader {
+public class DatasetPartitionReader {
     private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
 
     private final DatasetMemoryManager datasetMemoryManager;
@@ -36,51 +32,12 @@
 
     private final ResultState resultState;
 
-    private IFileHandle fileHandle;
-
     public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
         this.datasetMemoryManager = datasetMemoryManager;
         this.executor = executor;
         this.resultState = resultState;
     }
 
-    private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
-        long readSize = 0;
-        synchronized (resultState) {
-            while (offset >= resultState.getSize() && !resultState.getEOS()) {
-                try {
-                    resultState.wait();
-                } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-        }
-
-        if (offset >= resultState.getSize() && resultState.getEOS()) {
-            return readSize;
-        }
-
-        if (offset < resultState.getPersistentSize()) {
-            readSize = resultState.getIOManager().syncRead(fileHandle, offset, buffer);
-        }
-
-        if (readSize < buffer.capacity()) {
-            long localPageOffset = offset - resultState.getPersistentSize();
-            int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
-            int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
-            Page page = resultState.getPage(localPageIndex);
-            if (page == null) {
-            	return readSize;
-            }
-            readSize += buffer.remaining();
-            buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
-        }
-
-        datasetMemoryManager.pageReferenced(resultState.getResultSetPartitionId());
-        return readSize;
-    }
-
-    @Override
     public void writeTo(final IFrameWriter writer) {
         executor.execute(new Runnable() {
             @Override
@@ -88,8 +45,7 @@
                 NetworkOutputChannel channel = (NetworkOutputChannel) writer;
                 channel.setFrameSize(resultState.getFrameSize());
                 try {
-                    fileHandle = resultState.getIOManager().open(resultState.getValidFileReference(),
-                            IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+                    resultState.readOpen();
                     channel.open();
                     try {
                         long offset = 0;
@@ -109,10 +65,8 @@
                         }
                     } finally {
                         channel.close();
-                        resultState.getIOManager().close(fileHandle);
+                        resultState.readClose();
                     }
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
                 } catch (HyracksDataException e) {
                     throw new RuntimeException(e);
                 }
@@ -120,6 +74,14 @@
                     LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
                 }
             }
+
+            private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+                if (datasetMemoryManager == null) {
+                    return resultState.read(offset, buffer);
+                } else {
+                    return resultState.read(datasetMemoryManager, offset, buffer);
+                }
+            }
         });
     }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 317f553..8f4b639 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -18,24 +18,19 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
-import edu.uci.ics.hyracks.api.dataset.Page;
 import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
-public class DatasetPartitionWriter implements IDatasetPartitionWriter {
+public class DatasetPartitionWriter implements IFrameWriter {
     private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
 
-    private static final String FILE_PREFIX = "result_";
-
     private final IDatasetPartitionManager manager;
 
     private final JobId jobId;
@@ -50,10 +45,9 @@
 
     private final ResultState resultState;
 
-    private IFileHandle fileHandle;
-
     public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
-            ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
+            ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager,
+            IWorkspaceFileFactory fileFactory) {
         this.manager = manager;
         this.jobId = jobId;
         this.resultSetId = rsId;
@@ -61,7 +55,7 @@
         this.datasetMemoryManager = datasetMemoryManager;
 
         resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
-        resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), ctx.getFrameSize());
+        resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), fileFactory, ctx.getFrameSize());
     }
 
     public ResultState getResultState() {
@@ -69,41 +63,27 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    public void open() {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("open(" + partition + ")");
         }
-        String fName = FILE_PREFIX + String.valueOf(partition);
-        FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
-        fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        resultState.init(fRef, fileHandle);
+        resultState.open();
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        int srcOffset = 0;
-        Page destPage = resultState.getLastPage();
-
-        while (srcOffset < buffer.limit()) {
-            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
-                destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
-                resultState.addPage(destPage);
-            }
-            int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
-            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
-            srcOffset += srcLength;
-            resultState.incrementSize(srcLength);
-        }
-
-        synchronized (resultState) {
-            resultState.notifyAll();
+        if (datasetMemoryManager == null) {
+            resultState.write(buffer);
+        } else {
+            resultState.write(datasetMemoryManager, buffer);
         }
     }
 
     @Override
     public void fail() throws HyracksDataException {
         try {
+            resultState.closeAndDelete();
+            resultState.abort();
             manager.reportPartitionFailure(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
@@ -117,32 +97,10 @@
         }
 
         try {
-            synchronized (resultState) {
-                resultState.setEOS(true);
-                resultState.notifyAll();
-            }
+            resultState.close();
             manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
         } catch (HyracksException e) {
             throw new HyracksDataException(e);
         }
     }
-
-    @Override
-    public Page returnPage() throws HyracksDataException {
-        Page page = resultState.removePage(0);
-
-        IIOManager ioManager = resultState.getIOManager();
-
-        // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
-        if (page == null) {
-            ioManager.close(fileHandle);
-            return null;
-        }
-
-        page.getBuffer().flip();
-
-        long delta = ioManager.syncWrite(fileHandle, resultState.getPersistentSize(), page.getBuffer());
-        resultState.incrementPersistentSize(delta);
-        return page;
-    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 661df93..911f372 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -17,28 +17,35 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
 import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
 
 public class ResultState implements IStateObject {
+    private static final String FILE_PREFIX = "result_";
+
     private final ResultSetPartitionId resultSetPartitionId;
 
     private final int frameSize;
 
     private final IIOManager ioManager;
 
+    private final IWorkspaceFileFactory fileFactory;
+
     private final AtomicBoolean eos;
 
-    private final AtomicBoolean readEOS;
+    private final AtomicBoolean failed;
 
     private final List<Page> localPageList;
 
@@ -46,29 +53,40 @@
 
     private IFileHandle writeFileHandle;
 
+    private IFileHandle readFileHandle;
+
     private long size;
 
     private long persistentSize;
 
-    ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, int frameSize) {
+    ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
+            int frameSize) {
         this.resultSetPartitionId = resultSetPartitionId;
         this.ioManager = ioManager;
+        this.fileFactory = fileFactory;
         this.frameSize = frameSize;
         eos = new AtomicBoolean(false);
-        readEOS = new AtomicBoolean(false);
+        failed = new AtomicBoolean(false);
         localPageList = new ArrayList<Page>();
+
+        fileRef = null;
+        writeFileHandle = null;
     }
 
-    public synchronized void init(FileReference fileRef, IFileHandle writeFileHandle) {
-        this.fileRef = fileRef;
-        this.writeFileHandle = writeFileHandle;
-
+    public synchronized void open() {
         size = 0;
         persistentSize = 0;
+    }
+
+    public synchronized void close() {
+        eos.set(true);
         notifyAll();
     }
 
-    public synchronized void deinit() {
+    public synchronized void closeAndDelete() {
+        // Deleting a job is equivalent to aborting the job for all practical purposes, so the same action, needs
+        // to be taken when there are more requests to these result states.
+        failed.set(true);
         if (writeFileHandle != null) {
             try {
                 ioManager.close(writeFileHandle);
@@ -76,7 +94,149 @@
                 // Since file handle could not be closed, just ignore.
             }
         }
-        fileRef.delete();
+        if (fileRef != null) {
+            fileRef.delete();
+        }
+    }
+
+    public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
+        if (fileRef == null) {
+            String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+            writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        }
+
+        size += ioManager.syncWrite(writeFileHandle, size, buffer);
+
+        notifyAll();
+    }
+
+    public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
+            throws HyracksDataException {
+        int srcOffset = 0;
+        Page destPage = null;
+
+        if (!localPageList.isEmpty()) {
+            destPage = localPageList.get(localPageList.size() - 1);
+        }
+
+        while (srcOffset < buffer.limit()) {
+            if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+                destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+                localPageList.add(destPage);
+            }
+            int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+            destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+            srcOffset += srcLength;
+            size += srcLength;
+        }
+
+        notifyAll();
+    }
+
+    public synchronized void readOpen() {
+        // It is a noOp for now, leaving here to keep the API stable for future usage.
+    }
+
+    public synchronized void readClose() throws HyracksDataException {
+        if (readFileHandle != null) {
+            ioManager.close(readFileHandle);
+        }
+    }
+
+    public synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+        long readSize = 0;
+
+        while (offset >= size && !eos.get() && !failed.get()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        if ((offset >= size && eos.get()) || failed.get()) {
+            return readSize;
+        }
+
+        if (readFileHandle == null) {
+            initReadFileHandle();
+        }
+        readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+
+        return readSize;
+    }
+
+    public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+            throws HyracksDataException {
+        long readSize = 0;
+        synchronized (this) {
+            while (offset >= size && !eos.get() && !failed.get()) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+
+            if ((offset >= size && eos.get()) || failed.get()) {
+                return readSize;
+            }
+
+            if (offset < persistentSize) {
+                if (readFileHandle == null) {
+                    initReadFileHandle();
+                }
+                readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+            }
+
+            if (readSize < buffer.capacity()) {
+                long localPageOffset = offset - persistentSize;
+                int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
+                int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
+                Page page = getPage(localPageIndex);
+                if (page == null) {
+                    return readSize;
+                }
+                readSize += buffer.remaining();
+                buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+            }
+        }
+        datasetMemoryManager.pageReferenced(resultSetPartitionId);
+        return readSize;
+    }
+
+    public synchronized void abort() {
+        failed.set(true);
+        notifyAll();
+    }
+
+    public synchronized Page returnPage() throws HyracksDataException {
+        Page page = removePage();
+
+        // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+        if (page == null) {
+            ioManager.close(writeFileHandle);
+            return null;
+        }
+
+        page.getBuffer().flip();
+
+        if (fileRef == null) {
+            String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+            fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+            writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                    IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+            notifyAll();
+        }
+
+        long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());
+        persistentSize += delta;
+        return page;
+    }
+
+    public synchronized void setEOS(boolean eos) {
+        this.eos.set(eos);
     }
 
     public ResultSetPartitionId getResultSetPartitionId() {
@@ -91,76 +251,6 @@
         return ioManager;
     }
 
-    public synchronized void incrementSize(long delta) {
-        size += delta;
-    }
-
-    public synchronized long getSize() {
-        return size;
-    }
-
-    public synchronized void incrementPersistentSize(long delta) {
-        persistentSize += delta;
-    }
-
-    public synchronized long getPersistentSize() {
-        return persistentSize;
-    }
-
-    public void setEOS(boolean eos) {
-        this.eos.set(eos);
-    }
-
-    public boolean getEOS() {
-        return eos.get();
-    }
-
-    public boolean getReadEOS() {
-        return readEOS.get();
-    }
-
-    public synchronized void addPage(Page page) {
-        localPageList.add(page);
-    }
-
-    public synchronized Page removePage(int index) {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.remove(index);
-        }
-        return page;
-    }
-
-    public synchronized Page getPage(int index) {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(index);
-        }
-        return page;
-    }
-
-    public synchronized Page getLastPage() {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(localPageList.size() - 1);
-        }
-        return page;
-    }
-
-    public synchronized Page getFirstPage() {
-        Page page = null;
-        if (!localPageList.isEmpty()) {
-            page = localPageList.get(0);
-        }
-        return page;
-    }
-
-    public synchronized FileReference getValidFileReference() throws InterruptedException {
-        while (fileRef == null)
-            wait();
-        return fileRef;
-    }
-
     @Override
     public JobId getJobId() {
         return resultSetPartitionId.getJobId();
@@ -185,4 +275,36 @@
     public void fromBytes(DataInput in) throws IOException {
         throw new UnsupportedOperationException();
     }
+
+    private Page getPage(int index) {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.get(index);
+        }
+        return page;
+    }
+
+    private Page removePage() {
+        Page page = null;
+        if (!localPageList.isEmpty()) {
+            page = localPageList.remove(localPageList.size() - 1);
+        }
+        return page;
+    }
+
+    private void initReadFileHandle() throws HyracksDataException {
+        while (fileRef == null && !failed.get()) {
+            try {
+                wait();
+            } catch (InterruptedException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        if (failed.get()) {
+            return;
+        }
+
+        readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
index 5b8b333..84baf49 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -24,6 +24,7 @@
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
@@ -95,6 +96,7 @@
         @Override
         public void accept(ByteBuffer buffer) {
             JobId jobId = new JobId(buffer.getLong());
+            ResultSetId rsId = new ResultSetId(buffer.getLong());
             int partition = buffer.getInt();
             if (LOGGER.isLoggable(Level.FINE)) {
                 LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
@@ -102,7 +104,7 @@
             }
             noc = new NetworkOutputChannel(ccb, 1);
             try {
-                partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
+                partitionManager.initializeDatasetPartitionReader(jobId, rsId, partition, noc);
             } catch (HyracksException e) {
                 noc.abort();
             }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
index 8f8c032..54ac99a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/AbortTasksWork.java
@@ -20,6 +20,7 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
@@ -46,6 +47,11 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Aborting Tasks: " + jobId + ":" + tasks);
         }
+        IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
+        if (dpm != null) {
+            ncs.getDatasetPartitionManager().abortReader(jobId);
+        }
+
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
         Joblet ji = jobletMap.get(jobId);
         if (ji != null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 3957934..013544d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -14,6 +14,8 @@
  */
 package edu.uci.ics.hyracks.control.nc.work;
 
+import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
+import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.Task;
@@ -32,8 +34,12 @@
     @Override
     public void run() {
         try {
-            ncs.getClusterController().notifyTaskFailure(task.getJobletContext().getJobId(), task.getTaskAttemptId(),
-                    ncs.getId(), details);
+            JobId jobId = task.getJobletContext().getJobId();
+            IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
+            if (dpm != null) {
+                dpm.abortReader(jobId);
+            }
+            ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), details);
         } catch (Exception e) {
             e.printStackTrace();
         }
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
index 07f6ba2..c403501 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameOutputStream.java
@@ -32,6 +32,9 @@
     }
 
     public void reset(ByteBuffer buffer, boolean clear) {
+        if (clear) {
+            buffer.clear();
+        }
         frameTupleAppender.reset(buffer, clear);
     }
 
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
index e74e54c..715f822 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -60,7 +60,7 @@
     private static IHyracksClientConnection hcc;
 
     private final List<File> outputFiles;
-    
+
     protected static int DEFAULT_MEM_PAGE_SIZE = 32768;
     protected static int DEFAULT_MEM_NUM_PAGES = 1000;
 
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 2847004..81c2367 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -647,4 +647,9 @@
             btree.validate();
         }
     }
+    
+    @Override
+    public String toString() {
+        return "LSMBTree [" + fileManager.getBaseDir() + "]"; 
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index e4dd369..12c7ea7 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -190,4 +190,9 @@
     public IBufferCache getBufferCache() {
         return diskBufferCache;
     }
+
+    @Override
+    public String toString() {
+        return "LSMIndex [" + fileManager.getBaseDir() + "]";
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 65e6a3d..7372b86 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -16,8 +16,9 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -35,6 +36,8 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 
 public class LSMHarness implements ILSMHarness {
+    private static final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
+
     private final ILSMIndexInternal lsmIndex;
     private final ILSMMergePolicy mergePolicy;
     private final ILSMOperationTracker opTracker;
@@ -171,7 +174,7 @@
         }
 
         lsmIndex.setFlushStatus(false);
-        
+
         if (!lsmIndex.scheduleFlush(ctx, callback)) {
             callback.beforeOperation();
             callback.afterOperation(null, null);
@@ -184,7 +187,11 @@
     public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
             IndexException {
         operation.getCallback().beforeOperation();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(lsmIndex + ": flushing");
+        }
         ILSMComponent newComponent = lsmIndex.flush(operation);
+        
         operation.getCallback().afterOperation(null, newComponent);
         lsmIndex.markAsValid(newComponent);
         operation.getCallback().afterFinalize(newComponent);
@@ -215,6 +222,9 @@
             IndexException {
         List<ILSMComponent> mergedComponents = new ArrayList<ILSMComponent>();
         operation.getCallback().beforeOperation();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(lsmIndex + ": merging");
+        }
         ILSMComponent newComponent = lsmIndex.merge(mergedComponents, operation);
         ctx.getComponentHolder().addAll(mergedComponents);
         operation.getCallback().afterOperation(mergedComponents, newComponent);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 33052f3..30fdd27 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -566,6 +566,7 @@
     public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ILSMComponent component;
         private final IIndexBulkLoader invIndexBulkLoader;
+        private boolean exceptionCaught = false;
 
         public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint)
                 throws IndexException {
@@ -599,6 +600,7 @@
         }
 
         protected void handleException() throws HyracksDataException {
+            exceptionCaught = true;
             ((LSMInvertedIndexImmutableComponent) component).getInvIndex().deactivate();
             ((LSMInvertedIndexImmutableComponent) component).getInvIndex().destroy();
             ((LSMInvertedIndexImmutableComponent) component).getDeletedKeysBTree().deactivate();
@@ -609,7 +611,9 @@
 
         @Override
         public void end() throws IndexException, HyracksDataException {
-            invIndexBulkLoader.end();
+            if (!exceptionCaught) {
+                invIndexBulkLoader.end();
+            }
             lsmHarness.addBulkLoadedComponent(component);
         }
     }
@@ -732,4 +736,9 @@
             component.getDeletedKeysBTree().validate();
         }
     }
+    
+    @Override
+    public String toString() {
+        return "LSMInvertedIndex [" + fileManager.getBaseDir() + "]"; 
+    }
 }
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 9b377a9..4b2e075 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -26,6 +26,7 @@
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeNonExistentKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
@@ -55,6 +56,7 @@
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree.RTreeAccessor;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public abstract class AbstractLSMRTree extends AbstractLSMIndex implements ITreeIndex {
@@ -279,6 +281,8 @@
             throw new UnsupportedOperationException("Physical delete not supported in the LSM-RTree");
         }
 
+        ctx.modificationCallback.before(tuple);
+        ctx.modificationCallback.found(null, tuple);
         if (ctx.getOperation() == IndexOperation.INSERT) {
             // Before each insert, we must check whether there exist a killer
             // tuple in the memBTree. If we find a killer tuple, we must truly
@@ -323,13 +327,16 @@
     }
 
     protected LSMRTreeOpContext createOpContext(IModificationOperationCallback modCallback) {
-        return new LSMRTreeOpContext((RTree.RTreeAccessor) mutableComponent.getRTree().createAccessor(modCallback,
-                NoOpOperationCallback.INSTANCE), (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame(),
+        RTreeAccessor rtreeAccessor = (RTree.RTreeAccessor) mutableComponent.getRTree().createAccessor(
+                NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        BTreeAccessor btreeAccessor = (BTree.BTreeAccessor) mutableComponent.getBTree().createAccessor(
+                NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+        
+        return new LSMRTreeOpContext(rtreeAccessor, (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame(),
                 (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(), memFreePageManager
-                        .getMetaDataFrameFactory().createFrame(), 4, (BTree.BTreeAccessor) mutableComponent.getBTree()
-                        .createAccessor(modCallback, NoOpOperationCallback.INSTANCE), btreeLeafFrameFactory,
+                        .getMetaDataFrameFactory().createFrame(), 4, btreeAccessor, btreeLeafFrameFactory,
                 btreeInteriorFrameFactory, memFreePageManager.getMetaDataFrameFactory().createFrame(),
-                rtreeCmpFactories, btreeCmpFactories, null, null);
+                rtreeCmpFactories, btreeCmpFactories, modCallback, NoOpOperationCallback.INSTANCE);
     }
 
     @Override
@@ -355,4 +362,9 @@
         InMemoryBufferCache memBufferCache = (InMemoryBufferCache) mutableComponent.getRTree().getBufferCache();
         return memBufferCache.getNumPages() * memBufferCache.getPageSize();
     }
+    
+    @Override
+    public String toString() {
+        return "LSMRTree [" + fileManager.getBaseDir() + "]"; 
+    }
 }
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index b761f62..6c7d5fd 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -52,7 +52,7 @@
         ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
         ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
         ccConfig.defaultMaxJobAttempts = 0;
-        ccConfig.jobHistorySize = 0;
+        ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
 
         // cluster controller