Fixed a bug on unclosed running aggregation runtime; fixed an issue on
two adjacent exchange operators (connectors) when duplicate sort
operator is removed.
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index a684bc9..02e090d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -104,6 +104,10 @@
foundTarget = false;
break;
}
+ if(child.getOperatorTag() == LogicalOperatorTag.GROUP){
+ foundTarget = false;
+ break;
+ }
if (orderSensitiveOps.contains(child.getOperatorTag())) {
orderSensitive = true;
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 98606f1..173bded 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -531,6 +531,10 @@
}
}
Mutable<ILogicalOperator> ci = op.getInputs().get(i);
+ if (((AbstractLogicalOperator) ci.getValue()).getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+ ci = ci.getValue().getInputs().get(0);
+ op.getInputs().set(i, ci);
+ }
ExchangeOperator exchg = new ExchangeOperator();
exchg.setPhysicalOperator(pop);
setNewOp(ci, exchg, context);
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index dc9c805..8973c75 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -131,7 +131,14 @@
@Override
public void close() {
-
+ for (int i = 0; i < pipelines.length; ++i) {
+ try {
+ outputWriter.setInputIdx(i);
+ pipelines[i].close();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
};
}
@@ -232,7 +239,10 @@
@Override
public void close() throws HyracksDataException {
- // clearFrame();
+ if(outputAppender.getTupleCount() > 0){
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ outputAppender.reset(outputFrame, true);
+ }
}
public void setInputIdx(int inputIdx) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index cfc2bb6..600d641 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -90,7 +90,7 @@
@Override
public void open() throws HyracksDataException {
- if(!first){
+ if (!first) {
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 88dc19f..415e718 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -86,7 +86,6 @@
private IUnnestingEvaluator agg;
private ArrayTupleBuilder tupleBuilder;
- private int tupleCount;
private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
@Override
@@ -98,7 +97,6 @@
throw new HyracksDataException(ae);
}
tupleBuilder = new ArrayTupleBuilder(projectionList.length);
- tupleCount = 1;
writer.open();
}
@@ -120,6 +118,10 @@
try {
agg.init(tRef);
+ // assume that when unnesting the tuple, each step() call for each element
+ // in the tuple will increase the positionIndex, and the positionIndex will
+ // be reset when a new tuple is to be processed.
+ int positionIndex = 1;
boolean goon = true;
do {
tupleBuilder.reset();
@@ -146,7 +148,7 @@
if (hasPositionalVariable) {
// Write the positional variable as an INT32
tupleBuilder.getDataOutput().writeByte(3);
- tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+ tupleBuilder.getDataOutput().writeInt(offset + positionIndex++);
tupleBuilder.addFieldEndOffset();
}
appendToFrameFromTupleBuilder(tupleBuilder);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index c7a9f6d..ba9ff49 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -163,6 +163,7 @@
FrameUtils.flushFrame(outFrame, writer);
}
}
+ aggregator.close();
aggregateState.close();
writer.close();
}