address Madhu's review comments
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
index 9cf11ca..0c105d0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AggregateOperator.java
@@ -92,7 +92,4 @@
         return env;
     }
 
-    public void setExecutionMode(ExecutionMode mode) {
-        super.setExecutionMode(mode);
-    }
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
index a772db6..531b300 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/StreamLimitPOperator.java
@@ -38,10 +38,8 @@
 
 public class StreamLimitPOperator extends AbstractPhysicalOperator {
 
-    private boolean global;
+    public StreamLimitPOperator() {
 
-    public StreamLimitPOperator(boolean global) {
-        this.global = global;
     }
 
     @Override
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
index acfdbd3..e5f60bc 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/PushLimitDownRule.java
@@ -106,7 +106,7 @@
                     opLim.getMaxObjects(), opLim.getOffset());
             clone2 = new LimitOperator(maxPlusOffset, false);
         }
-        clone2.setPhysicalOperator(new StreamLimitPOperator(false));
+        clone2.setPhysicalOperator(new StreamLimitPOperator());
         clone2.getInputs().add(new MutableObject<ILogicalOperator>(op2));
         clone2.setExecutionMode(op2.getExecutionMode());
         clone2.recomputeSchema();
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index c19ef65..9c8ad46 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -21,7 +21,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
@@ -172,9 +171,7 @@
                     break;
                 }
                 case LIMIT: {
-                    LimitOperator opLim = (LimitOperator) op;
-                    op.setPhysicalOperator(new StreamLimitPOperator(opLim.isTopmostLimitOp()
-                            && opLim.getExecutionMode() == ExecutionMode.UNPARTITIONED));
+                    op.setPhysicalOperator(new StreamLimitPOperator());
                     break;
                 }
                 case NESTEDTUPLESOURCE: {
diff --git a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
index e81c405..a049f15 100644
--- a/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
+++ b/hivesterix/hivesterix-optimizer/src/main/java/edu/uci/ics/hivesterix/optimizer/rulecollections/HiveRuleCollections.java
@@ -37,77 +37,76 @@
 

 public final class HiveRuleCollections {

 

-	public final static LinkedList<IAlgebraicRewriteRule> NORMALIZATION = new LinkedList<IAlgebraicRewriteRule>();

-	static {

-		NORMALIZATION.add(new EliminateSubplanRule());

-		NORMALIZATION.add(new BreakSelectIntoConjunctsRule());

-		NORMALIZATION.add(new PushSelectIntoJoinRule());

-		NORMALIZATION.add(new ExtractGbyExpressionsRule());

-		NORMALIZATION.add(new RemoveRedundantSelectRule());

-	}

+    public final static LinkedList<IAlgebraicRewriteRule> NORMALIZATION = new LinkedList<IAlgebraicRewriteRule>();

+    static {

+        NORMALIZATION.add(new EliminateSubplanRule());

+        NORMALIZATION.add(new BreakSelectIntoConjunctsRule());

+        NORMALIZATION.add(new PushSelectIntoJoinRule());

+        NORMALIZATION.add(new ExtractGbyExpressionsRule());

+        NORMALIZATION.add(new RemoveRedundantSelectRule());

+    }

 

-	public final static LinkedList<IAlgebraicRewriteRule> COND_PUSHDOWN_AND_JOIN_INFERENCE = new LinkedList<IAlgebraicRewriteRule>();

-	static {

-		COND_PUSHDOWN_AND_JOIN_INFERENCE.add(new PushSelectDownRule());

-		COND_PUSHDOWN_AND_JOIN_INFERENCE.add(new InlineVariablesRule());

-		COND_PUSHDOWN_AND_JOIN_INFERENCE

-				.add(new FactorRedundantGroupAndDecorVarsRule());

-		COND_PUSHDOWN_AND_JOIN_INFERENCE.add(new EliminateSubplanRule());

-	}

+    public final static LinkedList<IAlgebraicRewriteRule> COND_PUSHDOWN_AND_JOIN_INFERENCE = new LinkedList<IAlgebraicRewriteRule>();

+    static {

+        COND_PUSHDOWN_AND_JOIN_INFERENCE.add(new PushSelectDownRule());

+        COND_PUSHDOWN_AND_JOIN_INFERENCE.add(new InlineVariablesRule());

+        COND_PUSHDOWN_AND_JOIN_INFERENCE.add(new FactorRedundantGroupAndDecorVarsRule());

+        COND_PUSHDOWN_AND_JOIN_INFERENCE.add(new EliminateSubplanRule());

+    }

 

-	public final static LinkedList<IAlgebraicRewriteRule> LOAD_FIELDS = new LinkedList<IAlgebraicRewriteRule>();

-	static {

-		// should LoadRecordFieldsRule be applied in only one pass over the

-		// plan?

-		LOAD_FIELDS.add(new InlineVariablesRule());

-		// LOAD_FIELDS.add(new RemoveUnusedAssignAndAggregateRule());

-		LOAD_FIELDS.add(new ComplexJoinInferenceRule());

-		LOAD_FIELDS.add(new InferTypesRule());

-	}

+    public final static LinkedList<IAlgebraicRewriteRule> LOAD_FIELDS = new LinkedList<IAlgebraicRewriteRule>();

+    static {

+        // should LoadRecordFieldsRule be applied in only one pass over the

+        // plan?

+        LOAD_FIELDS.add(new InlineVariablesRule());

+        // LOAD_FIELDS.add(new RemoveUnusedAssignAndAggregateRule());

+        LOAD_FIELDS.add(new ComplexJoinInferenceRule());

+        LOAD_FIELDS.add(new InferTypesRule());

+    }

 

-	public final static LinkedList<IAlgebraicRewriteRule> OP_PUSHDOWN = new LinkedList<IAlgebraicRewriteRule>();

-	static {

-		OP_PUSHDOWN.add(new PushProjectDownRule());

-		OP_PUSHDOWN.add(new PushSelectDownRule());

-	}

+    public final static LinkedList<IAlgebraicRewriteRule> OP_PUSHDOWN = new LinkedList<IAlgebraicRewriteRule>();

+    static {

+        OP_PUSHDOWN.add(new PushProjectDownRule());

+        OP_PUSHDOWN.add(new PushSelectDownRule());

+    }

 

-	public final static LinkedList<IAlgebraicRewriteRule> DATA_EXCHANGE = new LinkedList<IAlgebraicRewriteRule>();

-	static {

-		DATA_EXCHANGE.add(new SetExecutionModeRule());

-	}

+    public final static LinkedList<IAlgebraicRewriteRule> DATA_EXCHANGE = new LinkedList<IAlgebraicRewriteRule>();

+    static {

+        DATA_EXCHANGE.add(new SetExecutionModeRule());

+    }

 

-	public final static LinkedList<IAlgebraicRewriteRule> CONSOLIDATION = new LinkedList<IAlgebraicRewriteRule>();

-	static {

-		CONSOLIDATION.add(new RemoveRedundantProjectionRule());

-		CONSOLIDATION.add(new ConsolidateSelectsRule());

-		CONSOLIDATION.add(new IntroduceEarlyProjectRule());

-		CONSOLIDATION.add(new ConsolidateAssignsRule());

-		CONSOLIDATION.add(new IntroduceGroupByCombinerRule());

-		CONSOLIDATION.add(new IntroduceAggregateCombinerRule());

-		CONSOLIDATION.add(new RemoveUnusedAssignAndAggregateRule());

-	}

+    public final static LinkedList<IAlgebraicRewriteRule> CONSOLIDATION = new LinkedList<IAlgebraicRewriteRule>();

+    static {

+        CONSOLIDATION.add(new RemoveRedundantProjectionRule());

+        CONSOLIDATION.add(new ConsolidateSelectsRule());

+        CONSOLIDATION.add(new IntroduceEarlyProjectRule());

+        CONSOLIDATION.add(new ConsolidateAssignsRule());

+        CONSOLIDATION.add(new IntroduceGroupByCombinerRule());

+        CONSOLIDATION.add(new IntroduceAggregateCombinerRule());

+        CONSOLIDATION.add(new RemoveUnusedAssignAndAggregateRule());

+    }

 

-	public final static LinkedList<IAlgebraicRewriteRule> PHYSICAL_PLAN_REWRITES = new LinkedList<IAlgebraicRewriteRule>();

-	static {

-		PHYSICAL_PLAN_REWRITES.add(new PullSelectOutOfEqJoin());

-		PHYSICAL_PLAN_REWRITES.add(new SetAlgebricksPhysicalOperatorsRule());

-		PHYSICAL_PLAN_REWRITES.add(new EnforceStructuralPropertiesRule());

-		PHYSICAL_PLAN_REWRITES.add(new PushProjectDownRule());

-		PHYSICAL_PLAN_REWRITES.add(new SetAlgebricksPhysicalOperatorsRule());

-		PHYSICAL_PLAN_REWRITES.add(new PushLimitDownRule());

-		PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeWriteRule());

-		PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeUnionRule());

-	}

+    public final static LinkedList<IAlgebraicRewriteRule> PHYSICAL_PLAN_REWRITES = new LinkedList<IAlgebraicRewriteRule>();

+    static {

+        PHYSICAL_PLAN_REWRITES.add(new PullSelectOutOfEqJoin());

+        PHYSICAL_PLAN_REWRITES.add(new SetAlgebricksPhysicalOperatorsRule());

+        PHYSICAL_PLAN_REWRITES.add(new EnforceStructuralPropertiesRule());

+        PHYSICAL_PLAN_REWRITES.add(new PushProjectDownRule());

+        PHYSICAL_PLAN_REWRITES.add(new SetAlgebricksPhysicalOperatorsRule());

+        PHYSICAL_PLAN_REWRITES.add(new PushLimitDownRule());

+        PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeWriteRule());

+        PHYSICAL_PLAN_REWRITES.add(new InsertProjectBeforeUnionRule());

+    }

 

-	public final static LinkedList<IAlgebraicRewriteRule> prepareJobGenRules = new LinkedList<IAlgebraicRewriteRule>();

-	static {

-		prepareJobGenRules.add(new ReinferAllTypesRule());

-		prepareJobGenRules.add(new IsolateHyracksOperatorsRule(

-				HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));

-		prepareJobGenRules.add(new ExtractCommonOperatorsRule());

-		prepareJobGenRules.add(new LocalGroupByRule());

-		prepareJobGenRules.add(new PushProjectIntoDataSourceScanRule());

-		prepareJobGenRules.add(new ReinferAllTypesRule());

-	}

+    public final static LinkedList<IAlgebraicRewriteRule> prepareJobGenRules = new LinkedList<IAlgebraicRewriteRule>();

+    static {

+        prepareJobGenRules.add(new ReinferAllTypesRule());

+        prepareJobGenRules.add(new IsolateHyracksOperatorsRule(

+                HeuristicOptimizer.hyraxOperatorsBelowWhichJobGenIsDisabled));

+        prepareJobGenRules.add(new ExtractCommonOperatorsRule());

+        prepareJobGenRules.add(new LocalGroupByRule());

+        prepareJobGenRules.add(new PushProjectIntoDataSourceScanRule());

+        prepareJobGenRules.add(new ReinferAllTypesRule());

+    }

 

 }

diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index d099645..8410e1e 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -52,7 +52,7 @@
         ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
         ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
         ccConfig.defaultMaxJobAttempts = 0;
-        ccConfig.jobHistorySize = 0;
+        ccConfig.jobHistorySize = 1;
         ccConfig.profileDumpPeriod = -1;
 
         // cluster controller