Merge branch 'yingyi/fullstack_fix' of https://code.google.com/p/hyracks into yingyi/fullstack_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index c5326ce..956c74f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -30,6 +30,8 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
@@ -60,7 +62,6 @@
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent) {
-
         return emptyUnaryRequirements();
     }
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index a684bc9..02e090d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -104,6 +104,10 @@
                     foundTarget = false;
                     break;
                 }
+                if(child.getOperatorTag() == LogicalOperatorTag.GROUP){
+                    foundTarget = false;
+                    break;
+                }
                 if (orderSensitiveOps.contains(child.getOperatorTag())) {
                     orderSensitive = true;
                 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 98606f1..810defc 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -270,6 +270,8 @@
             // Now, transfer annotations from the original sort op. to this one.
             AbstractLogicalOperator transferTo = nextOp;
             if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+                // 
+                // remove duplicate exchange operator
                 transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
             }
             transferTo.getAnnotations().putAll(op.getAnnotations());
@@ -345,6 +347,13 @@
             LocalOrderProperty orderProp = (LocalOrderProperty) dlvd.get(j);
             returnedProperties.add(new OrderColumn(orderProp.getColumn(), orderProp.getOrder()));
         }
+        // maintain other order columns after the required order columns
+        if(returnedProperties.size() != 0){
+            for(int j = prefix + 1; j < dlvdCols.size(); j++){
+                LocalOrderProperty orderProp = (LocalOrderProperty) dlvd.get(j);
+                returnedProperties.add(new OrderColumn(orderProp.getColumn(), orderProp.getOrder()));
+            }
+        }
         return returnedProperties;
     }
 
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
index 82e6970..e702d9f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
@@ -43,7 +43,8 @@
             throws AlgebricksException {
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
         if (op1.getPhysicalOperator() == null
-                || op1.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_EXCHANGE) {
+                || (op1.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_EXCHANGE && op1
+                        .getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE)) {
             return false;
         }
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
@@ -51,6 +52,12 @@
                 || op2.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
             return false;
         }
+        if (op1.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE) {
+            // if it is a hash_partition_merge_exchange, the sort_merge_exchange can be simply removed
+            op1.getInputs().get(0).setValue(op2.getInputs().get(0).getValue());
+            op1.computeDeliveredPhysicalProperties(context);
+            return true;
+        }
         HashPartitionExchangePOperator hpe = (HashPartitionExchangePOperator) op1.getPhysicalOperator();
         SortMergeExchangePOperator sme = (SortMergeExchangePOperator) op2.getPhysicalOperator();
         List<OrderColumn> ocList = new ArrayList<OrderColumn>();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index dc9c805..8973c75 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -131,7 +131,14 @@
 
             @Override
             public void close() {
-
+                for (int i = 0; i < pipelines.length; ++i) {
+                    try {
+                        outputWriter.setInputIdx(i);
+                        pipelines[i].close();
+                    } catch (HyracksDataException e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
             }
         };
     }
@@ -232,7 +239,10 @@
 
         @Override
         public void close() throws HyracksDataException {
-            // clearFrame();
+            if(outputAppender.getTupleCount() > 0){
+                FrameUtils.flushFrame(outputFrame, outputWriter);
+                outputAppender.reset(outputFrame, true);
+            }
         }
 
         public void setInputIdx(int inputIdx) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index cfc2bb6..600d641 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -90,7 +90,7 @@
 
             @Override
             public void open() throws HyracksDataException {
-                if(!first){
+                if (!first) {
                     FrameUtils.flushFrame(frame, writer);
                     appender.reset(frame, true);
                 }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 88dc19f..415e718 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -86,7 +86,6 @@
             private IUnnestingEvaluator agg;
             private ArrayTupleBuilder tupleBuilder;
 
-            private int tupleCount;
             private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
 
             @Override
@@ -98,7 +97,6 @@
                     throw new HyracksDataException(ae);
                 }
                 tupleBuilder = new ArrayTupleBuilder(projectionList.length);
-                tupleCount = 1;
                 writer.open();
             }
 
@@ -120,6 +118,10 @@
 
                     try {
                         agg.init(tRef);
+                        // assume that when unnesting the tuple, each step() call for each element
+                        // in the tuple will increase the positionIndex, and the positionIndex will
+                        // be reset when a new tuple is to be processed.
+                        int positionIndex = 1;
                         boolean goon = true;
                         do {
                             tupleBuilder.reset();
@@ -146,7 +148,7 @@
                                 if (hasPositionalVariable) {
                                     // Write the positional variable as an INT32
                                     tupleBuilder.getDataOutput().writeByte(3);
-                                    tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+                                    tupleBuilder.getDataOutput().writeInt(offset + positionIndex++);
                                     tupleBuilder.addFieldEndOffset();
                                 }
                                 appendToFrameFromTupleBuilder(tupleBuilder);
diff --git a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q21_suppliers_who_kept_orders_waiting.plan b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q21_suppliers_who_kept_orders_waiting.plan
index a22bf53..e4f2cd6 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q21_suppliers_who_kept_orders_waiting.plan
+++ b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q21_suppliers_who_kept_orders_waiting.plan
@@ -14,7 +14,7 @@
                }
         -- PRE_CLUSTERED_GROUP_BY[$$17]  |PARTITIONED|
           exchange 
-          -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC)] HASH:[$$17]  |PARTITIONED|
+          -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC), $$18(ASC)] HASH:[$$17]  |PARTITIONED|
             group by ([$$17 := %0->$$1; $$18 := %0->$$3]) decor ([]) {
                       aggregate [$$20] <- [function-call: hive:max(PARTIAL1), Args:[%0->$$3]]
                       -- AGGREGATE  |LOCAL|
@@ -50,7 +50,7 @@
                }
         -- PRE_CLUSTERED_GROUP_BY[$$17]  |PARTITIONED|
           exchange 
-          -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC)] HASH:[$$17]  |PARTITIONED|
+          -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC), $$18(ASC)] HASH:[$$17]  |PARTITIONED|
             group by ([$$17 := %0->$$1; $$18 := %0->$$3]) decor ([]) {
                       aggregate [$$20] <- [function-call: hive:max(PARTIAL1), Args:[%0->$$3]]
                       -- AGGREGATE  |LOCAL|
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index c7a9f6d..ba9ff49 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -163,6 +163,7 @@
                 FrameUtils.flushFrame(outFrame, writer);
             }
         }
+        aggregator.close();
         aggregateState.close();
         writer.close();
     }
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java
index 3f651df..cdaf144 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java
@@ -228,10 +228,6 @@
             // On the left page, remove the highest key and make its child pointer
             // the rightmost child pointer.
             buf.putInt(tupleCountOff, tuplesToLeft);
-
-            // Compact both pages.
-            rightFrame.compact();
-            compact();
         }
         // Copy the split key to be inserted.
         // We must do so because setting the new split key will overwrite the
@@ -251,6 +247,9 @@
         buf.putInt(rightLeafOff, buf.getInt(getLeftChildPageOff(frameTuple)));
         buf.putInt(tupleCountOff, tuplesToLeft - 1);
 
+        // Compact both pages.
+        rightFrame.compact();
+        compact();
         // Insert the saved split key.
         int targetTupleIndex;
         // it's safe to catch this exception since it will have been caught before reaching here