[ASTERIXDB-2286][COMP] Parallel sort changes p.2
- user model changes: no
- storage format changes: no
- interface changes: no
details:
This patch is change the way the SequentialMergeExchangePOperator
connector computes its local property instead of blindly
propagating the child's local property.
The patch also includes minor code clean-ups (moved some methods down)
Change-Id: Ie37e03b6fc6e55fc21f8324c0f09a7fa05b51769
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3005
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp
new file mode 100644
index 0000000..473a52a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/p_sort_seq_merge/p_sort_seq_merge.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description: testing a sequential merge when parallel sort has redistributed the data across partitions and one of
+ * the next operators requires merging the data. The local order property is not present, but ORDERED_PARTITIONED is
+ * present, and sequential merge connector will be introduced instead of a random merge.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type TestType as
+{
+ id: int,
+ f1: int
+};
+
+create dataset TestDS(TestType) primary key id;
+
+set `compiler.sort.parallel` "true";
+
+[(select * from TestDS v order by v.f1, v.id)];
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan
new file mode 100644
index 0000000..d3d1d85
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/p_sort_seq_merge/p_sort_seq_merge.plan
@@ -0,0 +1,34 @@
+-- DISTRIBUTE_RESULT |UNPARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |UNPARTITIONED|
+ -- STREAM_PROJECT |UNPARTITIONED|
+ -- ASSIGN |UNPARTITIONED|
+ -- AGGREGATE |UNPARTITIONED|
+ -- SEQUENTIAL_MERGE_EXCHANGE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- STABLE_SORT [$$18(ASC), $$17(ASC)] |PARTITIONED|
+ -- RANGE_PARTITION_EXCHANGE [$$18(ASC), $$17(ASC)] |PARTITIONED|
+ -- FORWARD |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
+ -- BROADCAST_EXCHANGE |PARTITIONED|
+ -- AGGREGATE |UNPARTITIONED|
+ -- RANDOM_MERGE_EXCHANGE |PARTITIONED|
+ -- AGGREGATE |PARTITIONED|
+ -- STREAM_PROJECT |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- REPLICATE |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- ASSIGN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- DATASOURCE_SCAN |PARTITIONED|
+ -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
+ -- EMPTY_TUPLE_SOURCE |PARTITIONED|
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp
index af54590..9f45ee3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/p_sort_seq_merge/p_sort_seq_merge.1.ddl.sqlpp
@@ -19,7 +19,8 @@
/*
* Description: testing a sequential merge when parallel sort has redistributed the data across partitions and one of
- * the next operators requires merging the sorted data.
+ * the next operators requires merging the data. The local order property is not present, but ORDERED_PARTITIONED is
+ * present, and sequential merge connector will be introduced instead of a random merge.
*/
drop dataverse test if exists;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index 4c58ad7..66c95ee 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -64,7 +64,7 @@
COMPILER_SORT_SAMPLES(
INTEGER,
AlgebricksConfig.SORT_SAMPLES,
- "The number of samples parallel sorting should " + "take from each partition");
+ "The number of samples which parallel sorting should take from each partition");
private final IOptionType type;
private final Object defaultValue;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
index c967a94..d33a6f7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
@@ -118,12 +118,12 @@
@Override
public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx)
throws HyracksDataException {
- return new GlobalSamplingAggregateFunction(args, ctx, ascendingFlags, numOfPartitions, numOrderFields);
+ return new RangeMapFunction(args, ctx, ascendingFlags, numOfPartitions, numOrderFields);
}
};
}
- private class GlobalSamplingAggregateFunction implements IAggregateEvaluator {
+ private class RangeMapFunction implements IAggregateEvaluator {
private final IScalarEvaluator localSamplesEval;
private final IPointable localSamples;
private final List<List<byte[]>> finalSamples;
@@ -138,8 +138,8 @@
private final ArrayBackedValueStorage storage;
@SuppressWarnings("unchecked")
- private GlobalSamplingAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
- boolean[] ascending, int numOfPartitions, int numOrderByFields) throws HyracksDataException {
+ private RangeMapFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean[] ascending,
+ int numOfPartitions, int numOrderByFields) throws HyracksDataException {
localSamples = new VoidPointable();
localSamplesEval = args[0].createScalarEvaluator(context);
finalSamples = new ArrayList<>();
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
index df0b446..df9141b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SequentialMergeExchangePOperator.java
@@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -31,7 +30,11 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
@@ -39,6 +42,9 @@
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.connectors.MToOneSequentialMergingConnectorDescriptor;
+/**
+ * A merging connector that merges the tuples sequentially from the partitions starting from the partition at index 0.
+ */
public class SequentialMergeExchangePOperator extends AbstractExchangePOperator {
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -47,30 +53,75 @@
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+ IPhysicalPropertiesVector requiredByParent, IOptimizationContext context) {
return emptyUnaryRequirements();
}
+ /**
+ * <Pre>
+ * The local properties delivered by this connector are either:
+ * 1. nothing if the child doesn't deliver any special property
+ * 2. order property if the child is locally ordered and globally ordered on the same prefix
+ *
+ * The partitioning property is always UNPARTITIONED since it's a merging connector
+ * </Pre>
+ * @param op the logical operator of this physical operator
+ * @param context optimization context, not used here
+ */
@Override
- public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
- throws AlgebricksException {
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
AbstractLogicalOperator childOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
List<ILocalStructuralProperty> childLocalProps = childOp.getDeliveredPhysicalProperties().getLocalProperties();
- List<ILocalStructuralProperty> localProperties;
- if (childLocalProps != null) {
- localProperties = new ArrayList<>(childLocalProps);
- } else {
- localProperties = new ArrayList<>(0);
+ IPartitioningProperty childPartitioning = childOp.getDeliveredPhysicalProperties().getPartitioningProperty();
+ List<ILocalStructuralProperty> outputLocalProp = new ArrayList<>(0);
+ if (childLocalProps != null && !childLocalProps.isEmpty() && childPartitioning != null
+ && childPartitioning.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED) {
+ // the child could have a local order property that matches its global order property
+ propagateChildProperties((OrderedPartitionedProperty) childPartitioning, childLocalProps, outputLocalProp);
}
- deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, localProperties);
+ deliveredProperties = new StructuralPropertiesVector(IPartitioningProperty.UNPARTITIONED, outputLocalProp);
}
@Override
public Pair<IConnectorDescriptor, IHyracksJobBuilder.TargetConstraint> createConnectorDescriptor(
- IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context)
- throws AlgebricksException {
+ IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) {
IConnectorDescriptor connector = new MToOneSequentialMergingConnectorDescriptor(spec);
return new Pair<>(connector, IHyracksJobBuilder.TargetConstraint.ONE);
}
+
+ /**
+ * Matches prefix of the child's local order property & global order property. If a prefix is determined, the
+ * local order property is propagated through this connector. In essence, the connector says it maintains the
+ * order originally present in the child.
+ * @param childPartitioning the global ordering property of the child made by ORDERED_PARTITIONED partitioning
+ * @param childLocalProps the local properties inside the partitions
+ * @param outputLocalProp the local property of the connector that will be modified if propagating prop. happens
+ */
+ private void propagateChildProperties(OrderedPartitionedProperty childPartitioning,
+ List<ILocalStructuralProperty> childLocalProps, List<ILocalStructuralProperty> outputLocalProp) {
+ ILocalStructuralProperty childLocalProp = childLocalProps.get(0);
+ // skip if the first property is a grouping property
+ if (childLocalProp.getPropertyType() == ILocalStructuralProperty.PropertyType.LOCAL_ORDER_PROPERTY) {
+ OrderColumn localOrderColumn;
+ List<OrderColumn> outputOrderColumns = new ArrayList<>();
+ List<OrderColumn> globalOrderColumns = childPartitioning.getOrderColumns();
+ List<OrderColumn> localOrderColumns = ((LocalOrderProperty) childLocalProp).getOrderColumns();
+ // start matching the order columns
+ for (int i = 0; i < localOrderColumns.size() && i < globalOrderColumns.size(); i++) {
+ localOrderColumn = localOrderColumns.get(i);
+ if (localOrderColumn.equals(globalOrderColumns.get(i))) {
+ outputOrderColumns.add(localOrderColumn);
+ } else {
+ // stop whenever the matching fails, end of prefix matching
+ break;
+ }
+ }
+
+ if (!outputOrderColumns.isEmpty()) {
+ // found a prefix
+ outputLocalProp.add(new LocalOrderProperty(outputOrderColumns));
+ }
+ }
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 52b3f59..96e2e53 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -167,60 +167,6 @@
return changed;
}
- private boolean physOptimizePlan(ILogicalPlan plan, IPhysicalPropertiesVector pvector, boolean nestedPlan,
- IOptimizationContext context) throws AlgebricksException {
- boolean loggerTraceEnabled = AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
- boolean changed = false;
- for (Mutable<ILogicalOperator> root : plan.getRoots()) {
- if (physOptimizeOp(root, pvector, nestedPlan, context)) {
- changed = true;
- }
- AbstractLogicalOperator op = (AbstractLogicalOperator) root.getValue();
- op.computeDeliveredPhysicalProperties(context);
- if (loggerTraceEnabled) {
- AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Structural properties for " + op.getPhysicalOperator()
- + ": " + op.getDeliveredPhysicalProperties() + "\n");
- }
- }
- return changed;
- }
-
- // Gets the index of a child to start top-down data property enforcement.
- // If there is a partitioning-compatible child with the operator in opRef,
- // start from this child; otherwise, start from child zero.
- private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
- IOptimizationContext context) throws AlgebricksException {
- IPhysicalPropertiesVector[] reqdProperties = null;
- if (pr != null) {
- reqdProperties = pr.getRequiredProperties();
- }
-
- List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<>();
- for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
- AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
- deliveredPartitioningPropertiesFromChildren
- .add(child.getDeliveredPhysicalProperties().getPartitioningProperty());
- }
- int partitioningCompatibleChild = 0;
- for (int i = 0; i < op.getInputs().size(); i++) {
- IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i);
- if (reqdProperties == null || reqdProperties[i] == null
- || reqdProperties[i].getPartitioningProperty() == null || deliveredPropertyFromChild == null
- || reqdProperties[i].getPartitioningProperty()
- .getPartitioningType() != deliveredPartitioningPropertiesFromChildren.get(i)
- .getPartitioningType()) {
- continue;
- }
- IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty();
- // If child i's delivered partitioning property already satisfies the required property, stop and return the child index.
- if (PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, deliveredPropertyFromChild, true)) {
- partitioningCompatibleChild = i;
- break;
- }
- }
- return partitioningCompatibleChild;
- }
-
private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector required,
boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {
@@ -359,6 +305,60 @@
return changed;
}
+ private boolean physOptimizePlan(ILogicalPlan plan, IPhysicalPropertiesVector pvector, boolean nestedPlan,
+ IOptimizationContext context) throws AlgebricksException {
+ boolean loggerTraceEnabled = AlgebricksConfig.ALGEBRICKS_LOGGER.isTraceEnabled();
+ boolean changed = false;
+ for (Mutable<ILogicalOperator> root : plan.getRoots()) {
+ if (physOptimizeOp(root, pvector, nestedPlan, context)) {
+ changed = true;
+ }
+ AbstractLogicalOperator op = (AbstractLogicalOperator) root.getValue();
+ op.computeDeliveredPhysicalProperties(context);
+ if (loggerTraceEnabled) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER.trace(">>>> Structural properties for " + op.getPhysicalOperator()
+ + ": " + op.getDeliveredPhysicalProperties() + "\n");
+ }
+ }
+ return changed;
+ }
+
+ // Gets the index of a child to start top-down data property enforcement.
+ // If there is a partitioning-compatible child with the operator in opRef,
+ // start from this child; otherwise, start from child zero.
+ private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
+ IOptimizationContext context) throws AlgebricksException {
+ IPhysicalPropertiesVector[] reqdProperties = null;
+ if (pr != null) {
+ reqdProperties = pr.getRequiredProperties();
+ }
+
+ List<IPartitioningProperty> deliveredPartitioningPropertiesFromChildren = new ArrayList<>();
+ for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+ AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+ deliveredPartitioningPropertiesFromChildren
+ .add(child.getDeliveredPhysicalProperties().getPartitioningProperty());
+ }
+ int partitioningCompatibleChild = 0;
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ IPartitioningProperty deliveredPropertyFromChild = deliveredPartitioningPropertiesFromChildren.get(i);
+ if (reqdProperties == null || reqdProperties[i] == null
+ || reqdProperties[i].getPartitioningProperty() == null || deliveredPropertyFromChild == null
+ || reqdProperties[i].getPartitioningProperty()
+ .getPartitioningType() != deliveredPartitioningPropertiesFromChildren.get(i)
+ .getPartitioningType()) {
+ continue;
+ }
+ IPartitioningProperty requiredPropertyForChild = reqdProperties[i].getPartitioningProperty();
+ // If child i's delivered partitioning property already satisfies the required property, stop and return the child index.
+ if (PropertiesUtil.matchPartitioningProps(requiredPropertyForChild, deliveredPropertyFromChild, true)) {
+ partitioningCompatibleChild = i;
+ break;
+ }
+ }
+ return partitioningCompatibleChild;
+ }
+
private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild,
IPhysicalPropertiesVector required, boolean mayExpandPartitioningProperties, IOptimizationContext context)
throws AlgebricksException {
@@ -888,13 +888,13 @@
return forwardOperator;
}
- private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) {
- for (ILocalStructuralProperty lsp : cldLocals) {
- if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
+ private boolean allAreOrderProps(List<ILocalStructuralProperty> childLocalProperties) {
+ for (ILocalStructuralProperty childLocalProperty : childLocalProperties) {
+ if (childLocalProperty.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
return false;
}
}
- return !cldLocals.isEmpty();
+ return !childLocalProperties.isEmpty();
}
private void printOp(AbstractLogicalOperator op) throws AlgebricksException {
@@ -927,7 +927,7 @@
throws AlgebricksException {
ILogicalOperator oldOp = opRef.getValue();
opRef.setValue(newOp);
- newOp.getInputs().add(new MutableObject<ILogicalOperator>(oldOp));
+ newOp.getInputs().add(new MutableObject<>(oldOp));
newOp.recomputeSchema();
newOp.computeDeliveredPhysicalProperties(context);
context.computeAndSetTypeEnvironmentForOperator(newOp);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
index c437619..0b1c0a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/DeterministicPartitionBatchManager.java
@@ -69,7 +69,7 @@
}
}
- private synchronized boolean allPartitionsAdded() {
+ private boolean allPartitionsAdded() {
for (int i = 0; i < partitions.length; i++) {
if (partitions[i] == null) {
return false;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
index 2646c94..6aa305b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/collectors/SequentialMergeFrameReader.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.comm.IFrameReader;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+// TODO(ali): consider sort-concat-merge as an alternative.
public class SequentialMergeFrameReader implements IFrameReader {
private final int numSenders;
private final IPartitionBatchManager partitionBatchManager;