Fixed a bug on unclosed running aggregation runtime; fixed an issue on
two adjacent exchange operators (connectors) when duplicate sort
operator is removed. 
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index a684bc9..02e090d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -104,6 +104,10 @@
                     foundTarget = false;
                     break;
                 }
+                if(child.getOperatorTag() == LogicalOperatorTag.GROUP){
+                    foundTarget = false;
+                    break;
+                }
                 if (orderSensitiveOps.contains(child.getOperatorTag())) {
                     orderSensitive = true;
                 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 98606f1..173bded 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -531,6 +531,10 @@
                 }
             }
             Mutable<ILogicalOperator> ci = op.getInputs().get(i);
+            if (((AbstractLogicalOperator) ci.getValue()).getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+                ci = ci.getValue().getInputs().get(0);
+                op.getInputs().set(i, ci);
+            }
             ExchangeOperator exchg = new ExchangeOperator();
             exchg.setPhysicalOperator(pop);
             setNewOp(ci, exchg, context);
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index dc9c805..8973c75 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -131,7 +131,14 @@
 
             @Override
             public void close() {
-
+                for (int i = 0; i < pipelines.length; ++i) {
+                    try {
+                        outputWriter.setInputIdx(i);
+                        pipelines[i].close();
+                    } catch (HyracksDataException e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
             }
         };
     }
@@ -232,7 +239,10 @@
 
         @Override
         public void close() throws HyracksDataException {
-            // clearFrame();
+            if(outputAppender.getTupleCount() > 0){
+                FrameUtils.flushFrame(outputFrame, outputWriter);
+                outputAppender.reset(outputFrame, true);
+            }
         }
 
         public void setInputIdx(int inputIdx) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index cfc2bb6..600d641 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -90,7 +90,7 @@
 
             @Override
             public void open() throws HyracksDataException {
-                if(!first){
+                if (!first) {
                     FrameUtils.flushFrame(frame, writer);
                     appender.reset(frame, true);
                 }
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 88dc19f..415e718 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -86,7 +86,6 @@
             private IUnnestingEvaluator agg;
             private ArrayTupleBuilder tupleBuilder;
 
-            private int tupleCount;
             private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
 
             @Override
@@ -98,7 +97,6 @@
                     throw new HyracksDataException(ae);
                 }
                 tupleBuilder = new ArrayTupleBuilder(projectionList.length);
-                tupleCount = 1;
                 writer.open();
             }
 
@@ -120,6 +118,10 @@
 
                     try {
                         agg.init(tRef);
+                        // assume that when unnesting the tuple, each step() call for each element
+                        // in the tuple will increase the positionIndex, and the positionIndex will
+                        // be reset when a new tuple is to be processed.
+                        int positionIndex = 1;
                         boolean goon = true;
                         do {
                             tupleBuilder.reset();
@@ -146,7 +148,7 @@
                                 if (hasPositionalVariable) {
                                     // Write the positional variable as an INT32
                                     tupleBuilder.getDataOutput().writeByte(3);
-                                    tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+                                    tupleBuilder.getDataOutput().writeInt(offset + positionIndex++);
                                     tupleBuilder.addFieldEndOffset();
                                 }
                                 appendToFrameFromTupleBuilder(tupleBuilder);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index c7a9f6d..ba9ff49 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -163,6 +163,7 @@
                 FrameUtils.flushFrame(outFrame, writer);
             }
         }
+        aggregator.close();
         aggregateState.close();
         writer.close();
     }