Merge branch 'yingyi/fullstack_fix' of https://code.google.com/p/hyracks into yingyi/fullstack_fix
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
index c5326ce..956c74f 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RunningAggregatePOperator.java
@@ -30,6 +30,8 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
@@ -60,7 +62,6 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
-
return emptyUnaryRequirements();
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
index a684bc9..02e090d 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -104,6 +104,10 @@
foundTarget = false;
break;
}
+ if(child.getOperatorTag() == LogicalOperatorTag.GROUP){
+ foundTarget = false;
+ break;
+ }
if (orderSensitiveOps.contains(child.getOperatorTag())) {
orderSensitive = true;
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 98606f1..810defc 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -270,6 +270,8 @@
// Now, transfer annotations from the original sort op. to this one.
AbstractLogicalOperator transferTo = nextOp;
if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+ //
+ // remove duplicate exchange operator
transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
}
transferTo.getAnnotations().putAll(op.getAnnotations());
@@ -345,6 +347,13 @@
LocalOrderProperty orderProp = (LocalOrderProperty) dlvd.get(j);
returnedProperties.add(new OrderColumn(orderProp.getColumn(), orderProp.getOrder()));
}
+ // maintain other order columns after the required order columns
+ if(returnedProperties.size() != 0){
+ for(int j = prefix + 1; j < dlvdCols.size(); j++){
+ LocalOrderProperty orderProp = (LocalOrderProperty) dlvd.get(j);
+ returnedProperties.add(new OrderColumn(orderProp.getColumn(), orderProp.getOrder()));
+ }
+ }
return returnedProperties;
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
index 82e6970..e702d9f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/IntroHashPartitionMergeExchange.java
@@ -43,7 +43,8 @@
throws AlgebricksException {
AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
if (op1.getPhysicalOperator() == null
- || op1.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_EXCHANGE) {
+ || (op1.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_EXCHANGE && op1
+ .getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE)) {
return false;
}
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
@@ -51,6 +52,12 @@
|| op2.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.SORT_MERGE_EXCHANGE) {
return false;
}
+ if (op1.getPhysicalOperator().getOperatorTag() == PhysicalOperatorTag.HASH_PARTITION_MERGE_EXCHANGE) {
+ // if it is a hash_partition_merge_exchange, the sort_merge_exchange can be simply removed
+ op1.getInputs().get(0).setValue(op2.getInputs().get(0).getValue());
+ op1.computeDeliveredPhysicalProperties(context);
+ return true;
+ }
HashPartitionExchangePOperator hpe = (HashPartitionExchangePOperator) op1.getPhysicalOperator();
SortMergeExchangePOperator sme = (SortMergeExchangePOperator) op2.getPhysicalOperator();
List<OrderColumn> ocList = new ArrayList<OrderColumn>();
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index dc9c805..8973c75 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -131,7 +131,14 @@
@Override
public void close() {
-
+ for (int i = 0; i < pipelines.length; ++i) {
+ try {
+ outputWriter.setInputIdx(i);
+ pipelines[i].close();
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
};
}
@@ -232,7 +239,10 @@
@Override
public void close() throws HyracksDataException {
- // clearFrame();
+ if(outputAppender.getTupleCount() > 0){
+ FrameUtils.flushFrame(outputFrame, outputWriter);
+ outputAppender.reset(outputFrame, true);
+ }
}
public void setInputIdx(int inputIdx) {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index cfc2bb6..600d641 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -90,7 +90,7 @@
@Override
public void open() throws HyracksDataException {
- if(!first){
+ if (!first) {
FrameUtils.flushFrame(frame, writer);
appender.reset(frame, true);
}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 88dc19f..415e718 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -86,7 +86,6 @@
private IUnnestingEvaluator agg;
private ArrayTupleBuilder tupleBuilder;
- private int tupleCount;
private IScalarEvaluator offsetEval = posOffsetEvalFactory.createScalarEvaluator(ctx);
@Override
@@ -98,7 +97,6 @@
throw new HyracksDataException(ae);
}
tupleBuilder = new ArrayTupleBuilder(projectionList.length);
- tupleCount = 1;
writer.open();
}
@@ -120,6 +118,10 @@
try {
agg.init(tRef);
+ // assume that when unnesting the tuple, each step() call for each element
+ // in the tuple will increase the positionIndex, and the positionIndex will
+ // be reset when a new tuple is to be processed.
+ int positionIndex = 1;
boolean goon = true;
do {
tupleBuilder.reset();
@@ -146,7 +148,7 @@
if (hasPositionalVariable) {
// Write the positional variable as an INT32
tupleBuilder.getDataOutput().writeByte(3);
- tupleBuilder.getDataOutput().writeInt(offset + tupleCount++);
+ tupleBuilder.getDataOutput().writeInt(offset + positionIndex++);
tupleBuilder.addFieldEndOffset();
}
appendToFrameFromTupleBuilder(tupleBuilder);
diff --git a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q21_suppliers_who_kept_orders_waiting.plan b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q21_suppliers_who_kept_orders_waiting.plan
index a22bf53..e4f2cd6 100644
--- a/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q21_suppliers_who_kept_orders_waiting.plan
+++ b/hivesterix/hivesterix-dist/src/test/resources/optimizerts/results/q21_suppliers_who_kept_orders_waiting.plan
@@ -14,7 +14,7 @@
}
-- PRE_CLUSTERED_GROUP_BY[$$17] |PARTITIONED|
exchange
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC)] HASH:[$$17] |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC), $$18(ASC)] HASH:[$$17] |PARTITIONED|
group by ([$$17 := %0->$$1; $$18 := %0->$$3]) decor ([]) {
aggregate [$$20] <- [function-call: hive:max(PARTIAL1), Args:[%0->$$3]]
-- AGGREGATE |LOCAL|
@@ -50,7 +50,7 @@
}
-- PRE_CLUSTERED_GROUP_BY[$$17] |PARTITIONED|
exchange
- -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC)] HASH:[$$17] |PARTITIONED|
+ -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$17(ASC), $$18(ASC)] HASH:[$$17] |PARTITIONED|
group by ([$$17 := %0->$$1; $$18 := %0->$$3]) decor ([]) {
aggregate [$$20] <- [function-call: hive:max(PARTIAL1), Args:[%0->$$3]]
-- AGGREGATE |LOCAL|
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index c7a9f6d..ba9ff49 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -163,6 +163,7 @@
FrameUtils.flushFrame(outFrame, writer);
}
}
+ aggregator.close();
aggregateState.close();
writer.close();
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java
index 3f651df..cdaf144 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/frames/BTreeNSMInteriorFrame.java
@@ -228,10 +228,6 @@
// On the left page, remove the highest key and make its child pointer
// the rightmost child pointer.
buf.putInt(tupleCountOff, tuplesToLeft);
-
- // Compact both pages.
- rightFrame.compact();
- compact();
}
// Copy the split key to be inserted.
// We must do so because setting the new split key will overwrite the
@@ -251,6 +247,9 @@
buf.putInt(rightLeafOff, buf.getInt(getLeftChildPageOff(frameTuple)));
buf.putInt(tupleCountOff, tuplesToLeft - 1);
+ // Compact both pages.
+ rightFrame.compact();
+ compact();
// Insert the saved split key.
int targetTupleIndex;
// it's safe to catch this exception since it will have been caught before reaching here