Range connector update with order by hint.
Change-Id: Iec1fbd79f62bfeef2081858bdfab3ff894f63e03
Reviewed-on: https://asterix-gerrit.ics.uci.edu/253
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ildar Absalyamov <ildar.absalyamov@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java
index 16cba67..251edca 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java
@@ -18,6 +18,7 @@
// hints
public static final String USE_HASH_GROUP_BY = "USE_HASH_GROUP_BY"; // -->
public static final String USE_EXTERNAL_GROUP_BY = "USE_EXTERNAL_GROUP_BY"; // -->
+ public static final String USE_RANGE_CONNECTOR = "USE_RANGE_CONNECTOR"; // -->
// Boolean
public static final String CARDINALITY = "CARDINALITY"; // -->
// Integer
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 911bfa1..d0be2a1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -47,6 +47,7 @@
PRE_SORTED_DISTINCT_BY,
RANDOM_MERGE_EXCHANGE,
RANGE_PARTITION_EXCHANGE,
+ RANGE_PARTITION_MERGE_EXCHANGE,
RTREE_SEARCH,
RUNNING_AGGREGATE,
SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 8796da4..8169ad0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -62,6 +62,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
@@ -154,6 +155,13 @@
}
break;
}
+ case RANGE_PARTITION_MERGE_EXCHANGE: {
+ RangePartitionMergePOperator concreteOp = (RangePartitionMergePOperator) physOp;
+ for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+ usedVariables.add(partCol.getColumn());
+ }
+ break;
+ }
default: {
throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
new file mode 100644
index 0000000..99f68e9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class RangePartitionMergePOperator extends AbstractExchangePOperator {
+
+ private List<OrderColumn> partitioningFields;
+ private INodeDomain domain;
+ private IRangeMap rangeMap;
+
+ public RangePartitionMergePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+ this.partitioningFields = partitioningFields;
+ this.domain = domain;
+ this.rangeMap = rangeMap;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.RANGE_PARTITION_MERGE_EXCHANGE;
+ }
+
+ public List<OrderColumn> getPartitioningFields() {
+ return partitioningFields;
+ }
+
+ public INodeDomain getDomain() {
+ return domain;
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+ List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
+ for (OrderColumn oc : partitioningFields) {
+ varList.add(oc.getColumn());
+ }
+ IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(varList), domain);
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
+ List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
+ for (ILocalStructuralProperty prop : op2Locals) {
+ if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
+ locals.add(prop);
+ } else {
+ break;
+ }
+ }
+
+ this.deliveredProperties = new StructuralPropertiesVector(p, locals);
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
+ List<OrderColumn> columns = new ArrayList<OrderColumn>();
+ for (OrderColumn oc : partitioningFields) {
+ LogicalVariable var = oc.getColumn();
+ columns.add(new OrderColumn(var, oc.getOrder()));
+ }
+ orderProps.add(new LocalOrderProperty(columns));
+ StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
+ orderProps) };
+ return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+ ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+ int n = partitioningFields.size();
+ int[] sortFields = new int[n];
+ IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ int i = 0;
+ for (OrderColumn oc : partitioningFields) {
+ LogicalVariable var = oc.getColumn();
+ sortFields[i] = opSchema.findVariable(var);
+ Object type = env.getVarType(var);
+ OrderKind order = oc.getOrder();
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+ }
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+ i++;
+ }
+ ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+ IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+ }
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+ }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index 4142cdc..66955e9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -19,13 +19,15 @@
import java.util.List;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -35,17 +37,27 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
public class RangePartitionPOperator extends AbstractExchangePOperator {
- private ArrayList<OrderColumn> partitioningFields;
+ private List<OrderColumn> partitioningFields;
private INodeDomain domain;
+ private IRangeMap rangeMap;
- public RangePartitionPOperator(ArrayList<OrderColumn> partitioningFields, INodeDomain domain) {
+ public RangePartitionPOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
this.partitioningFields = partitioningFields;
this.domain = domain;
+ this.rangeMap = rangeMap;
}
@Override
@@ -53,9 +65,17 @@
return PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE;
}
+ public List<OrderColumn> getPartitioningFields() {
+ return partitioningFields;
+ }
+
+ public INodeDomain getDomain() {
+ return domain;
+ }
+
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain);
+ IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain);
this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
}
@@ -68,11 +88,35 @@
@Override
public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
- throw new NotImplementedException();
+ int n = partitioningFields.size();
+ int[] sortFields = new int[n];
+ IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
+ IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+ int i = 0;
+ for (OrderColumn oc : partitioningFields) {
+ LogicalVariable var = oc.getColumn();
+ sortFields[i] = opSchema.findVariable(var);
+ Object type = env.getVarType(var);
+ OrderKind order = oc.getOrder();
+ if (i == 0 && nkcfProvider != null && type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+ }
+ IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+ comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+ i++;
+ }
+ ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+ IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+ return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
}
-
- public List<OrderColumn> getPartitioningFields() {
- return partitioningFields;
+
+ @Override
+ public String toString() {
+ return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
}
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index e1de495..a181304 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -44,173 +44,165 @@
import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
public class JobGenContext {
- private final IOperatorSchema outerFlowSchema;
- private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
- private final ISerializerDeserializerProvider serializerDeserializerProvider;
- private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
- private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
- private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
- private final IPrinterFactoryProvider printerFactoryProvider;
- private final ITypeTraitProvider typeTraitProvider;
- private final IMetadataProvider<?, ?> metadataProvider;
- private final INullWriterFactory nullWriterFactory;
- private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
- private final Object appContext;
- private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
- private final IBinaryIntegerInspectorFactory integerInspectorFactory;
- private final IExpressionRuntimeProvider expressionRuntimeProvider;
- private final IExpressionTypeComputer expressionTypeComputer;
- private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
- private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
- private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
- private final int frameSize;
- private AlgebricksPartitionConstraint clusterLocations;
- private int varCounter;
- private final ITypingContext typingContext;
+ private final IOperatorSchema outerFlowSchema;
+ private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
+ private final ISerializerDeserializerProvider serializerDeserializerProvider;
+ private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+ private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
+ private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
+ private final IPrinterFactoryProvider printerFactoryProvider;
+ private final ITypeTraitProvider typeTraitProvider;
+ private final IMetadataProvider<?, ?> metadataProvider;
+ private final INullWriterFactory nullWriterFactory;
+ private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
+ private final Object appContext;
+ private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
+ private final IBinaryIntegerInspectorFactory integerInspectorFactory;
+ private final IExpressionRuntimeProvider expressionRuntimeProvider;
+ private final IExpressionTypeComputer expressionTypeComputer;
+ private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+ private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+ private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
+ private final int frameSize;
+ private AlgebricksPartitionConstraint clusterLocations;
+ private int varCounter;
+ private final ITypingContext typingContext;
- public JobGenContext(
- IOperatorSchema outerFlowSchema,
- IMetadataProvider<?, ?> metadataProvider,
- Object appContext,
- ISerializerDeserializerProvider serializerDeserializerProvider,
- IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
- IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
- IBinaryComparatorFactoryProvider comparatorFactoryProvider,
- ITypeTraitProvider typeTraitProvider,
- IBinaryBooleanInspectorFactory booleanInspectorFactory,
- IBinaryIntegerInspectorFactory integerInspectorFactory,
- IPrinterFactoryProvider printerFactoryProvider,
- INullWriterFactory nullWriterFactory,
- INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
- IExpressionRuntimeProvider expressionRuntimeProvider,
- IExpressionTypeComputer expressionTypeComputer,
- INullableTypeComputer nullableTypeComputer,
- ITypingContext typingContext,
- IExpressionEvalSizeComputer expressionEvalSizeComputer,
- IPartialAggregationTypeComputer partialAggregationTypeComputer,
- IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize, AlgebricksPartitionConstraint clusterLocations) {
- this.outerFlowSchema = outerFlowSchema;
- this.metadataProvider = metadataProvider;
- this.appContext = appContext;
- this.serializerDeserializerProvider = serializerDeserializerProvider;
- this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
- this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
- this.comparatorFactoryProvider = comparatorFactoryProvider;
- this.typeTraitProvider = typeTraitProvider;
- this.booleanInspectorFactory = booleanInspectorFactory;
- this.integerInspectorFactory = integerInspectorFactory;
- this.printerFactoryProvider = printerFactoryProvider;
- this.clusterLocations = clusterLocations;
- this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
- this.nullWriterFactory = nullWriterFactory;
- this.expressionRuntimeProvider = expressionRuntimeProvider;
- this.expressionTypeComputer = expressionTypeComputer;
- this.typingContext = typingContext;
- this.expressionEvalSizeComputer = expressionEvalSizeComputer;
- this.partialAggregationTypeComputer = partialAggregationTypeComputer;
- this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
- this.frameSize = frameSize;
- this.varCounter = 0;
- }
+ public JobGenContext(IOperatorSchema outerFlowSchema, IMetadataProvider<?, ?> metadataProvider, Object appContext,
+ ISerializerDeserializerProvider serializerDeserializerProvider,
+ IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
+ IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
+ IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
+ IBinaryBooleanInspectorFactory booleanInspectorFactory,
+ IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
+ INullWriterFactory nullWriterFactory,
+ INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
+ IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
+ INullableTypeComputer nullableTypeComputer, ITypingContext typingContext,
+ IExpressionEvalSizeComputer expressionEvalSizeComputer,
+ IPartialAggregationTypeComputer partialAggregationTypeComputer,
+ IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize,
+ AlgebricksPartitionConstraint clusterLocations) {
+ this.outerFlowSchema = outerFlowSchema;
+ this.metadataProvider = metadataProvider;
+ this.appContext = appContext;
+ this.serializerDeserializerProvider = serializerDeserializerProvider;
+ this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
+ this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+ this.comparatorFactoryProvider = comparatorFactoryProvider;
+ this.typeTraitProvider = typeTraitProvider;
+ this.booleanInspectorFactory = booleanInspectorFactory;
+ this.integerInspectorFactory = integerInspectorFactory;
+ this.printerFactoryProvider = printerFactoryProvider;
+ this.clusterLocations = clusterLocations;
+ this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
+ this.nullWriterFactory = nullWriterFactory;
+ this.expressionRuntimeProvider = expressionRuntimeProvider;
+ this.expressionTypeComputer = expressionTypeComputer;
+ this.typingContext = typingContext;
+ this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+ this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+ this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
+ this.frameSize = frameSize;
+ this.varCounter = 0;
+ }
- public IOperatorSchema getOuterFlowSchema() {
- return outerFlowSchema;
- }
+ public IOperatorSchema getOuterFlowSchema() {
+ return outerFlowSchema;
+ }
- public AlgebricksPartitionConstraint getClusterLocations() {
- return clusterLocations;
- }
+ public AlgebricksPartitionConstraint getClusterLocations() {
+ return clusterLocations;
+ }
- public IMetadataProvider<?, ?> getMetadataProvider() {
- return metadataProvider;
- }
+ public IMetadataProvider<?, ?> getMetadataProvider() {
+ return metadataProvider;
+ }
- public Object getAppContext() {
- return appContext;
- }
+ public Object getAppContext() {
+ return appContext;
+ }
- public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
- return serializerDeserializerProvider;
- }
+ public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
+ return serializerDeserializerProvider;
+ }
- public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
- return hashFunctionFactoryProvider;
- }
+ public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
+ return hashFunctionFactoryProvider;
+ }
- public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
- return hashFunctionFamilyProvider;
- }
+ public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
+ return hashFunctionFamilyProvider;
+ }
- public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
- return comparatorFactoryProvider;
- }
+ public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
+ return comparatorFactoryProvider;
+ }
- public ITypeTraitProvider getTypeTraitProvider() {
- return typeTraitProvider;
- }
+ public ITypeTraitProvider getTypeTraitProvider() {
+ return typeTraitProvider;
+ }
- public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
- return booleanInspectorFactory;
- }
+ public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+ return booleanInspectorFactory;
+ }
- public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
- return integerInspectorFactory;
- }
+ public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+ return integerInspectorFactory;
+ }
- public IPrinterFactoryProvider getPrinterFactoryProvider() {
- return printerFactoryProvider;
- }
-
- public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider(){
- return predEvaluatorFactoryProvider;
- }
+ public IPrinterFactoryProvider getPrinterFactoryProvider() {
+ return printerFactoryProvider;
+ }
- public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
- return expressionRuntimeProvider;
- }
+ public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider() {
+ return predEvaluatorFactoryProvider;
+ }
- public IOperatorSchema getSchema(ILogicalOperator op) {
- return schemaMap.get(op);
- }
+ public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
+ return expressionRuntimeProvider;
+ }
- public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
- schemaMap.put(op, schema);
- }
+ public IOperatorSchema getSchema(ILogicalOperator op) {
+ return schemaMap.get(op);
+ }
- public LogicalVariable createNewVar() {
- varCounter++;
- LogicalVariable var = new LogicalVariable(-varCounter);
- return var;
- }
+ public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
+ schemaMap.put(op, schema);
+ }
- public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env)
- throws AlgebricksException {
- return expressionTypeComputer.getType(expr,
- typingContext.getMetadataProvider(), env);
- }
+ public LogicalVariable createNewVar() {
+ varCounter++;
+ LogicalVariable var = new LogicalVariable(-varCounter);
+ return var;
+ }
- public INullWriterFactory getNullWriterFactory() {
- return nullWriterFactory;
- }
+ public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env) throws AlgebricksException {
+ return expressionTypeComputer.getType(expr, typingContext.getMetadataProvider(), env);
+ }
- public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
- return normalizedKeyComputerFactoryProvider;
- }
+ public INullWriterFactory getNullWriterFactory() {
+ return nullWriterFactory;
+ }
- public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
- return expressionEvalSizeComputer;
- }
+ public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
+ return normalizedKeyComputerFactoryProvider;
+ }
- public int getFrameSize() {
- return frameSize;
- }
+ public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+ return expressionEvalSizeComputer;
+ }
- public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
- return partialAggregationTypeComputer;
- }
+ public int getFrameSize() {
+ return frameSize;
+ }
- public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
- return typingContext.getOutputTypeEnvironment(op);
- }
+ public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
+ return partialAggregationTypeComputer;
+ }
+
+ public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
+ return typingContext.getOutputTypeEnvironment(op);
+ }
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 9d72881..3b115eb 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -35,7 +35,9 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
@@ -55,6 +57,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
@@ -83,6 +86,7 @@
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
@@ -489,9 +493,15 @@
if (ordCols == null || ordCols.size() == 0) {
pop = new RandomMergeExchangePOperator();
} else {
- OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
- sortColumns = ordCols.toArray(sortColumns);
- pop = new SortMergeExchangePOperator(sortColumns);
+ if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
+ IRangeMap rangeMap = (IRangeMap) op.getAnnotations().get(
+ OperatorAnnotations.USE_RANGE_CONNECTOR);
+ pop = new RangePartitionMergePOperator(ordCols, domain, rangeMap);
+ } else {
+ OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
+ sortColumns = ordCols.toArray(sortColumns);
+ pop = new SortMergeExchangePOperator(sortColumns);
+ }
}
break;
}
@@ -519,7 +529,7 @@
break;
}
case ORDERED_PARTITIONED: {
- pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain);
+ pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain, null);
break;
}
case BROADCAST: {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerGeneratorFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerFamily.java
similarity index 65%
rename from hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerGeneratorFactory.java
rename to hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerFamily.java
index df66531..3ac7cd9 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerGeneratorFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerFamily.java
@@ -19,29 +19,29 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-public class RepartitionComputerGeneratorFactory implements ITuplePartitionComputerFamily{
-
- private static final long serialVersionUID = 1L;
+public class RepartitionComputerFamily implements ITuplePartitionComputerFamily {
- private int factor;
- private ITuplePartitionComputerFamily delegateFactory;
+ private static final long serialVersionUID = 1L;
- public RepartitionComputerGeneratorFactory(int factor, ITuplePartitionComputerFamily delegate) {
- this.factor = factor;
- this.delegateFactory = delegate;
- }
+ private int factor;
+ private ITuplePartitionComputerFamily delegateFactory;
- @Override
- public ITuplePartitionComputer createPartitioner(int seed) {
- final int s = seed;
- return new ITuplePartitionComputer() {
+ public RepartitionComputerFamily(int factor, ITuplePartitionComputerFamily delegate) {
+ this.factor = factor;
+ this.delegateFactory = delegate;
+ }
+
+ @Override
+ public ITuplePartitionComputer createPartitioner(int seed) {
+ final int s = seed;
+ return new ITuplePartitionComputer() {
private ITuplePartitionComputer delegate = delegateFactory.createPartitioner(s);
@Override
public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
- return delegate.partition(accessor, tIndex, factor * nParts) / factor;
+ return delegate.partition(accessor, tIndex, factor * nParts) / factor;
}
};
- }
+ }
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
new file mode 100644
index 0000000..3022c51
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.partition.range;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
+ private static final long serialVersionUID = 1L;
+ private final int[] rangeFields;
+ private IRangeMap rangeMap;
+ private IBinaryComparatorFactory[] comparatorFactories;
+
+ public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
+ IRangeMap rangeMap) {
+ this.rangeFields = rangeFields;
+ this.comparatorFactories = comparatorFactories;
+ this.rangeMap = rangeMap;
+ }
+
+ @Override
+ public ITuplePartitionComputer createPartitioner() {
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ return new ITuplePartitionComputer() {
+ @Override
+ /**
+ * Determine the range partition.
+ */
+ public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ if (nParts == 1) {
+ return 0;
+ }
+ int slotIndex = getRangePartition(accessor, tIndex);
+ // Map range partition to node partitions.
+ double rangesPerPart = 1;
+ if (rangeMap.getSplitCount() + 1 > nParts) {
+ rangesPerPart = ((double) rangeMap.getSplitCount() + 1) / nParts;
+ }
+ return (int) Math.floor(slotIndex / rangesPerPart);
+ }
+
+ /*
+ * Determine the range partition.
+ */
+ public int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+ int slotIndex = 0;
+ for (int i = 0; i < rangeMap.getSplitCount(); ++i) {
+ int c = compareSlotAndFields(accessor, tIndex, i);
+ if (c < 0) {
+ return slotIndex;
+ }
+ slotIndex++;
+ }
+ return slotIndex;
+ }
+
+ public int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int fieldIndex)
+ throws HyracksDataException {
+ int c = 0;
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int slotLength = accessor.getFieldSlotsLength();
+ for (int f = 0; f < comparators.length; ++f) {
+ int fIdx = rangeFields[f];
+ int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
+ int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
+ c = comparators[f].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart, fEnd
+ - fStart, rangeMap.getByteArray(fieldIndex, f), rangeMap.getStartOffset(fieldIndex, f),
+ rangeMap.getLength(fieldIndex, f));
+ if (c != 0) {
+ return c;
+ }
+ }
+ return c;
+ }
+
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/IRangeMap.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/IRangeMap.java
new file mode 100644
index 0000000..a503146
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/IRangeMap.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.dataflow.common.data.partition.range;
+
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+
+public interface IRangeMap {
+ public IPointable getFieldSplit(int columnIndex, int splitIndex);
+
+ public int getSplitCount();
+
+ public byte[] getByteArray(int columnIndex, int splitIndex);
+
+ public int getStartOffset(int columnIndex, int splitIndex);
+
+ public int getLength(int columnIndex, int splitIndex);
+
+ public int getTag(int columnIndex, int splitIndex);
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/RangeMap.java
new file mode 100644
index 0000000..c290d24
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -0,0 +1,80 @@
+package edu.uci.ics.hyracks.dataflow.common.data.partition.range;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+
+/**
+ * The range map stores the field split values in an byte array.
+ * The first split value for each field followed by the second split value for each field, etc.
+ */
+public class RangeMap implements IRangeMap, Serializable {
+ private final int fields;
+ private final byte[] bytes;
+ private final int[] offsets;
+
+ public RangeMap(int fields, byte[] bytes, int[] offsets) {
+ this.fields = fields;
+ this.bytes = bytes;
+ this.offsets = offsets;
+ }
+
+ @Override
+ public IPointable getFieldSplit(int columnIndex, int splitIndex) {
+ IPointable p = VoidPointable.FACTORY.createPointable();
+ int index = getFieldIndex(columnIndex, splitIndex);
+ p.set(bytes, getFieldStart(index), getFieldLength(index));
+ return p;
+ }
+
+ @Override
+ public int getSplitCount() {
+ return offsets.length / fields;
+ }
+
+ @Override
+ public byte[] getByteArray(int columnIndex, int splitIndex) {
+ return bytes;
+ }
+
+ @Override
+ public int getTag(int columnIndex, int splitIndex) {
+ return getFieldTag(getFieldIndex(columnIndex, splitIndex));
+ }
+
+ @Override
+ public int getStartOffset(int columnIndex, int splitIndex) {
+ return getFieldStart(getFieldIndex(columnIndex, splitIndex));
+ }
+
+ @Override
+ public int getLength(int columnIndex, int splitIndex) {
+ return getFieldLength(getFieldIndex(columnIndex, splitIndex));
+ }
+
+ private int getFieldIndex(int columnIndex, int splitIndex) {
+ return splitIndex * fields + columnIndex;
+ }
+
+ private int getFieldTag(int index) {
+ return bytes[getFieldStart(index)];
+ }
+
+ private int getFieldStart(int index) {
+ int start = 0;
+ if (index != 0) {
+ start = offsets[index - 1];
+ }
+ return start;
+ }
+
+ private int getFieldLength(int index) {
+ int length = offsets[index];
+ if (index != 0) {
+ length -= offsets[index - 1];
+ }
+ return length;
+ }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 540c31b..c03f474 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -46,7 +46,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerGeneratorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFamily;
import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
@@ -404,10 +404,8 @@
state.hybridHJ.closeProbe(writer);
BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
- hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0)
- .createPartitioner(0);
- hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1)
- .createPartitioner(0);
+ hpcRep0 = new RepartitionComputerFamily(state.numOfPartitions, hpcf0).createPartitioner(0);
+ hpcRep1 = new RepartitionComputerFamily(state.numOfPartitions, hpcf1).createPartitioner(0);
rPartbuff.clear();
for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
@@ -440,10 +438,10 @@
long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj
.getProbePartitionSize(pid) / ctx.getFrameSize());
- LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId()
- + ") (pid " + pid + ") - (level " + level + ") - wasReversed " + wasReversed
- + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize + " - MemForJoin "
- + (state.memForJoin) + " - LeftOuter is " + isLeftOuter);
+ LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
+ + pid + ") - (level " + level + ") - wasReversed " + wasReversed + " - BuildSize:\t"
+ + buildPartSize + "\tProbeSize:\t" + probePartSize + " - MemForJoin " + (state.memForJoin)
+ + " - LeftOuter is " + isLeftOuter);
//Apply in-Mem HJ if possible
if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)