Range connector update with order by hint.

Change-Id: Iec1fbd79f62bfeef2081858bdfab3ff894f63e03
Reviewed-on: https://asterix-gerrit.ics.uci.edu/253
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ildar Absalyamov <ildar.absalyamov@gmail.com>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java
index 16cba67..251edca 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/OperatorAnnotations.java
@@ -18,6 +18,7 @@
     // hints
     public static final String USE_HASH_GROUP_BY = "USE_HASH_GROUP_BY"; // -->
     public static final String USE_EXTERNAL_GROUP_BY = "USE_EXTERNAL_GROUP_BY"; // -->
+    public static final String USE_RANGE_CONNECTOR = "USE_RANGE_CONNECTOR"; // -->
     // Boolean
     public static final String CARDINALITY = "CARDINALITY"; // -->
     // Integer
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 911bfa1..d0be2a1 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -47,6 +47,7 @@
     PRE_SORTED_DISTINCT_BY,
     RANDOM_MERGE_EXCHANGE,
     RANGE_PARTITION_EXCHANGE,
+    RANGE_PARTITION_MERGE_EXCHANGE,
     RTREE_SEARCH,
     RUNNING_AGGREGATE,
     SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 8796da4..8169ad0 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -62,6 +62,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
@@ -154,6 +155,13 @@
                     }
                     break;
                 }
+                case RANGE_PARTITION_MERGE_EXCHANGE: {
+                    RangePartitionMergePOperator concreteOp = (RangePartitionMergePOperator) physOp;
+                    for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+                        usedVariables.add(partCol.getColumn());
+                    }
+                    break;
+                }
                 default: {
                     throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
                 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
new file mode 100644
index 0000000..99f68e9
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergePOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class RangePartitionMergePOperator extends AbstractExchangePOperator {
+
+    private List<OrderColumn> partitioningFields;
+    private INodeDomain domain;
+    private IRangeMap rangeMap;
+
+    public RangePartitionMergePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
+        this.partitioningFields = partitioningFields;
+        this.domain = domain;
+        this.rangeMap = rangeMap;
+    }
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.RANGE_PARTITION_MERGE_EXCHANGE;
+    }
+
+    public List<OrderColumn> getPartitioningFields() {
+        return partitioningFields;
+    }
+
+    public INodeDomain getDomain() {
+        return domain;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        List<LogicalVariable> varList = new ArrayList<LogicalVariable>();
+        for (OrderColumn oc : partitioningFields) {
+            varList.add(oc.getColumn());
+        }
+        IPartitioningProperty p = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(varList), domain);
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
+        List<ILocalStructuralProperty> locals = new ArrayList<ILocalStructuralProperty>();
+        for (ILocalStructuralProperty prop : op2Locals) {
+            if (prop.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
+                locals.add(prop);
+            } else {
+                break;
+            }
+        }
+
+        this.deliveredProperties = new StructuralPropertiesVector(p, locals);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        List<ILocalStructuralProperty> orderProps = new LinkedList<ILocalStructuralProperty>();
+        List<OrderColumn> columns = new ArrayList<OrderColumn>();
+        for (OrderColumn oc : partitioningFields) {
+            LogicalVariable var = oc.getColumn();
+            columns.add(new OrderColumn(var, oc.getOrder()));
+        }
+        orderProps.add(new LocalOrderProperty(columns));
+        StructuralPropertiesVector[] r = new StructuralPropertiesVector[] { new StructuralPropertiesVector(null,
+                orderProps) };
+        return new PhysicalRequirements(r, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
+            ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
+        int n = partitioningFields.size();
+        int[] sortFields = new int[n];
+        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        int i = 0;
+        for (OrderColumn oc : partitioningFields) {
+            LogicalVariable var = oc.getColumn();
+            sortFields[i] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            OrderKind order = oc.getOrder();
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+            }
+            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            i++;
+        }
+        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+        IConnectorDescriptor conn = new MToNPartitioningMergingConnectorDescriptor(spec, tpcf, sortFields, comps, nkcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
+    }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
index 4142cdc..66955e9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/RangePartitionPOperator.java
@@ -19,13 +19,15 @@
 import java.util.List;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -35,17 +37,27 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
 import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 
 public class RangePartitionPOperator extends AbstractExchangePOperator {
 
-    private ArrayList<OrderColumn> partitioningFields;
+    private List<OrderColumn> partitioningFields;
     private INodeDomain domain;
+    private IRangeMap rangeMap;
 
-    public RangePartitionPOperator(ArrayList<OrderColumn> partitioningFields, INodeDomain domain) {
+    public RangePartitionPOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap) {
         this.partitioningFields = partitioningFields;
         this.domain = domain;
+        this.rangeMap = rangeMap;
     }
 
     @Override
@@ -53,9 +65,17 @@
         return PhysicalOperatorTag.RANGE_PARTITION_EXCHANGE;
     }
 
+    public List<OrderColumn> getPartitioningFields() {
+        return partitioningFields;
+    }
+
+    public INodeDomain getDomain() {
+        return domain;
+    }
+
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain);
+        IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain);
         this.deliveredProperties = new StructuralPropertiesVector(p, new LinkedList<ILocalStructuralProperty>());
     }
 
@@ -68,11 +88,35 @@
     @Override
     public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
-        throw new NotImplementedException();
+        int n = partitioningFields.size();
+        int[] sortFields = new int[n];
+        IBinaryComparatorFactory[] comps = new IBinaryComparatorFactory[n];
+
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        INormalizedKeyComputerFactory nkcf = null;
+
+        IVariableTypeEnvironment env = context.getTypeEnvironment(op);
+        int i = 0;
+        for (OrderColumn oc : partitioningFields) {
+            LogicalVariable var = oc.getColumn();
+            sortFields[i] = opSchema.findVariable(var);
+            Object type = env.getVarType(var);
+            OrderKind order = oc.getOrder();
+            if (i == 0 && nkcfProvider != null && type != null) {
+                nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, order == OrderKind.ASC);
+            }
+            IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
+            comps[i] = bcfp.getBinaryComparatorFactory(type, oc.getOrder() == OrderKind.ASC);
+            i++;
+        }
+        ITuplePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, comps, rangeMap);
+        IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
+        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
-    
-    public List<OrderColumn> getPartitioningFields() {
-        return partitioningFields;
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + partitioningFields + " SPLIT COUNT:" + rangeMap.getSplitCount();
     }
 
 }
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
index e1de495..a181304 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -44,173 +44,165 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
 
 public class JobGenContext {
-	private final IOperatorSchema outerFlowSchema;
-	private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
-	private final ISerializerDeserializerProvider serializerDeserializerProvider;
-	private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
-	private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
-	private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
-	private final IPrinterFactoryProvider printerFactoryProvider;
-	private final ITypeTraitProvider typeTraitProvider;
-	private final IMetadataProvider<?, ?> metadataProvider;
-	private final INullWriterFactory nullWriterFactory;
-	private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
-	private final Object appContext;
-	private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
-	private final IBinaryIntegerInspectorFactory integerInspectorFactory;
-	private final IExpressionRuntimeProvider expressionRuntimeProvider;
-	private final IExpressionTypeComputer expressionTypeComputer;
-	private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
-	private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
-	private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
-	private final int frameSize;
-	private AlgebricksPartitionConstraint clusterLocations;
-	private int varCounter;
-	private final ITypingContext typingContext;
+    private final IOperatorSchema outerFlowSchema;
+    private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
+    private final ISerializerDeserializerProvider serializerDeserializerProvider;
+    private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+    private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
+    private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
+    private final IPrinterFactoryProvider printerFactoryProvider;
+    private final ITypeTraitProvider typeTraitProvider;
+    private final IMetadataProvider<?, ?> metadataProvider;
+    private final INullWriterFactory nullWriterFactory;
+    private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
+    private final Object appContext;
+    private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
+    private final IBinaryIntegerInspectorFactory integerInspectorFactory;
+    private final IExpressionRuntimeProvider expressionRuntimeProvider;
+    private final IExpressionTypeComputer expressionTypeComputer;
+    private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+    private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+    private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
+    private final int frameSize;
+    private AlgebricksPartitionConstraint clusterLocations;
+    private int varCounter;
+    private final ITypingContext typingContext;
 
-	public JobGenContext(
-			IOperatorSchema outerFlowSchema,
-			IMetadataProvider<?, ?> metadataProvider,
-			Object appContext,
-			ISerializerDeserializerProvider serializerDeserializerProvider,
-			IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
-			IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
-			IBinaryComparatorFactoryProvider comparatorFactoryProvider,
-			ITypeTraitProvider typeTraitProvider,
-			IBinaryBooleanInspectorFactory booleanInspectorFactory,
-			IBinaryIntegerInspectorFactory integerInspectorFactory,
-			IPrinterFactoryProvider printerFactoryProvider,
-			INullWriterFactory nullWriterFactory,
-			INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
-			IExpressionRuntimeProvider expressionRuntimeProvider,
-			IExpressionTypeComputer expressionTypeComputer,
-			INullableTypeComputer nullableTypeComputer,
-			ITypingContext typingContext,
-			IExpressionEvalSizeComputer expressionEvalSizeComputer,
-			IPartialAggregationTypeComputer partialAggregationTypeComputer,
-			IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize, AlgebricksPartitionConstraint clusterLocations) {
-		this.outerFlowSchema = outerFlowSchema;
-		this.metadataProvider = metadataProvider;
-		this.appContext = appContext;
-		this.serializerDeserializerProvider = serializerDeserializerProvider;
-		this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
-		this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
-		this.comparatorFactoryProvider = comparatorFactoryProvider;
-		this.typeTraitProvider = typeTraitProvider;
-		this.booleanInspectorFactory = booleanInspectorFactory;
-		this.integerInspectorFactory = integerInspectorFactory;
-		this.printerFactoryProvider = printerFactoryProvider;
-		this.clusterLocations = clusterLocations;
-		this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
-		this.nullWriterFactory = nullWriterFactory;
-		this.expressionRuntimeProvider = expressionRuntimeProvider;
-		this.expressionTypeComputer = expressionTypeComputer;
-		this.typingContext = typingContext;
-		this.expressionEvalSizeComputer = expressionEvalSizeComputer;
-		this.partialAggregationTypeComputer = partialAggregationTypeComputer;
-		this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
-		this.frameSize = frameSize;
-		this.varCounter = 0;
-	}
+    public JobGenContext(IOperatorSchema outerFlowSchema, IMetadataProvider<?, ?> metadataProvider, Object appContext,
+            ISerializerDeserializerProvider serializerDeserializerProvider,
+            IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
+            IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
+            IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
+            IBinaryBooleanInspectorFactory booleanInspectorFactory,
+            IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
+            INullWriterFactory nullWriterFactory,
+            INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
+            IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
+            INullableTypeComputer nullableTypeComputer, ITypingContext typingContext,
+            IExpressionEvalSizeComputer expressionEvalSizeComputer,
+            IPartialAggregationTypeComputer partialAggregationTypeComputer,
+            IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize,
+            AlgebricksPartitionConstraint clusterLocations) {
+        this.outerFlowSchema = outerFlowSchema;
+        this.metadataProvider = metadataProvider;
+        this.appContext = appContext;
+        this.serializerDeserializerProvider = serializerDeserializerProvider;
+        this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
+        this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+        this.comparatorFactoryProvider = comparatorFactoryProvider;
+        this.typeTraitProvider = typeTraitProvider;
+        this.booleanInspectorFactory = booleanInspectorFactory;
+        this.integerInspectorFactory = integerInspectorFactory;
+        this.printerFactoryProvider = printerFactoryProvider;
+        this.clusterLocations = clusterLocations;
+        this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
+        this.nullWriterFactory = nullWriterFactory;
+        this.expressionRuntimeProvider = expressionRuntimeProvider;
+        this.expressionTypeComputer = expressionTypeComputer;
+        this.typingContext = typingContext;
+        this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+        this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+        this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
+        this.frameSize = frameSize;
+        this.varCounter = 0;
+    }
 
-	public IOperatorSchema getOuterFlowSchema() {
-		return outerFlowSchema;
-	}
+    public IOperatorSchema getOuterFlowSchema() {
+        return outerFlowSchema;
+    }
 
-	public AlgebricksPartitionConstraint getClusterLocations() {
-		return clusterLocations;
-	}
+    public AlgebricksPartitionConstraint getClusterLocations() {
+        return clusterLocations;
+    }
 
-	public IMetadataProvider<?, ?> getMetadataProvider() {
-		return metadataProvider;
-	}
+    public IMetadataProvider<?, ?> getMetadataProvider() {
+        return metadataProvider;
+    }
 
-	public Object getAppContext() {
-		return appContext;
-	}
+    public Object getAppContext() {
+        return appContext;
+    }
 
-	public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
-		return serializerDeserializerProvider;
-	}
+    public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
+        return serializerDeserializerProvider;
+    }
 
-	public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
-		return hashFunctionFactoryProvider;
-	}
+    public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
+        return hashFunctionFactoryProvider;
+    }
 
-	public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
-		return hashFunctionFamilyProvider;
-	}
+    public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
+        return hashFunctionFamilyProvider;
+    }
 
-	public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
-		return comparatorFactoryProvider;
-	}
+    public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
+        return comparatorFactoryProvider;
+    }
 
-	public ITypeTraitProvider getTypeTraitProvider() {
-		return typeTraitProvider;
-	}
+    public ITypeTraitProvider getTypeTraitProvider() {
+        return typeTraitProvider;
+    }
 
-	public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
-		return booleanInspectorFactory;
-	}
+    public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+        return booleanInspectorFactory;
+    }
 
-	public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
-		return integerInspectorFactory;
-	}
+    public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+        return integerInspectorFactory;
+    }
 
-	public IPrinterFactoryProvider getPrinterFactoryProvider() {
-		return printerFactoryProvider;
-	}
-	
-	public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider(){
-		return predEvaluatorFactoryProvider;
-	}
+    public IPrinterFactoryProvider getPrinterFactoryProvider() {
+        return printerFactoryProvider;
+    }
 
-	public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
-		return expressionRuntimeProvider;
-	}
+    public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider() {
+        return predEvaluatorFactoryProvider;
+    }
 
-	public IOperatorSchema getSchema(ILogicalOperator op) {
-		return schemaMap.get(op);
-	}
+    public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
+        return expressionRuntimeProvider;
+    }
 
-	public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
-		schemaMap.put(op, schema);
-	}
+    public IOperatorSchema getSchema(ILogicalOperator op) {
+        return schemaMap.get(op);
+    }
 
-	public LogicalVariable createNewVar() {
-		varCounter++;
-		LogicalVariable var = new LogicalVariable(-varCounter);
-		return var;
-	}
+    public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
+        schemaMap.put(op, schema);
+    }
 
-	public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env)
-			throws AlgebricksException {
-		return expressionTypeComputer.getType(expr,
-				typingContext.getMetadataProvider(), env);
-	}
+    public LogicalVariable createNewVar() {
+        varCounter++;
+        LogicalVariable var = new LogicalVariable(-varCounter);
+        return var;
+    }
 
-	public INullWriterFactory getNullWriterFactory() {
-		return nullWriterFactory;
-	}
+    public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env) throws AlgebricksException {
+        return expressionTypeComputer.getType(expr, typingContext.getMetadataProvider(), env);
+    }
 
-	public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
-		return normalizedKeyComputerFactoryProvider;
-	}
+    public INullWriterFactory getNullWriterFactory() {
+        return nullWriterFactory;
+    }
 
-	public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
-		return expressionEvalSizeComputer;
-	}
+    public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
+        return normalizedKeyComputerFactoryProvider;
+    }
 
-	public int getFrameSize() {
-		return frameSize;
-	}
+    public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+        return expressionEvalSizeComputer;
+    }
 
-	public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
-		return partialAggregationTypeComputer;
-	}
+    public int getFrameSize() {
+        return frameSize;
+    }
 
-	public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
-		return typingContext.getOutputTypeEnvironment(op);
-	}
+    public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
+        return partialAggregationTypeComputer;
+    }
+
+    public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
+        return typingContext.getOutputTypeEnvironment(op);
+    }
 
 }
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 9d72881..3b115eb 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -35,7 +35,9 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
@@ -55,6 +57,7 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
@@ -83,6 +86,7 @@
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import edu.uci.ics.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
 
 public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
@@ -489,9 +493,15 @@
                     if (ordCols == null || ordCols.size() == 0) {
                         pop = new RandomMergeExchangePOperator();
                     } else {
-                        OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
-                        sortColumns = ordCols.toArray(sortColumns);
-                        pop = new SortMergeExchangePOperator(sortColumns);
+                        if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
+                            IRangeMap rangeMap = (IRangeMap) op.getAnnotations().get(
+                                    OperatorAnnotations.USE_RANGE_CONNECTOR);
+                            pop = new RangePartitionMergePOperator(ordCols, domain, rangeMap);
+                        } else {
+                            OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
+                            sortColumns = ordCols.toArray(sortColumns);
+                            pop = new SortMergeExchangePOperator(sortColumns);
+                        }
                     }
                     break;
                 }
@@ -519,7 +529,7 @@
                     break;
                 }
                 case ORDERED_PARTITIONED: {
-                    pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain);
+                    pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain, null);
                     break;
                 }
                 case BROADCAST: {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerGeneratorFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerFamily.java
similarity index 65%
rename from hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerGeneratorFactory.java
rename to hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerFamily.java
index df66531..3ac7cd9 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerGeneratorFactory.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/RepartitionComputerFamily.java
@@ -19,29 +19,29 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
-public class RepartitionComputerGeneratorFactory implements ITuplePartitionComputerFamily{
-	
-	 private static final long serialVersionUID = 1L;
+public class RepartitionComputerFamily implements ITuplePartitionComputerFamily {
 
-	    private int factor;
-	    private ITuplePartitionComputerFamily delegateFactory;
+    private static final long serialVersionUID = 1L;
 
-	    public RepartitionComputerGeneratorFactory(int factor, ITuplePartitionComputerFamily delegate) {
-	        this.factor = factor;
-	        this.delegateFactory = delegate;
-	    }
+    private int factor;
+    private ITuplePartitionComputerFamily delegateFactory;
 
-	@Override
-	public ITuplePartitionComputer createPartitioner(int seed) {
-		final int s = seed;
-		return new ITuplePartitionComputer() {
+    public RepartitionComputerFamily(int factor, ITuplePartitionComputerFamily delegate) {
+        this.factor = factor;
+        this.delegateFactory = delegate;
+    }
+
+    @Override
+    public ITuplePartitionComputer createPartitioner(int seed) {
+        final int s = seed;
+        return new ITuplePartitionComputer() {
             private ITuplePartitionComputer delegate = delegateFactory.createPartitioner(s);
 
             @Override
             public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
-            	return delegate.partition(accessor, tIndex, factor * nParts) / factor;
+                return delegate.partition(accessor, tIndex, factor * nParts) / factor;
             }
         };
-	}
+    }
 
 }
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
new file mode 100644
index 0000000..3022c51
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.partition.range;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
+    private static final long serialVersionUID = 1L;
+    private final int[] rangeFields;
+    private IRangeMap rangeMap;
+    private IBinaryComparatorFactory[] comparatorFactories;
+
+    public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
+            IRangeMap rangeMap) {
+        this.rangeFields = rangeFields;
+        this.comparatorFactories = comparatorFactories;
+        this.rangeMap = rangeMap;
+    }
+
+    @Override
+    public ITuplePartitionComputer createPartitioner() {
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        return new ITuplePartitionComputer() {
+            @Override
+            /**
+             * Determine the range partition. 
+             */
+            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+                if (nParts == 1) {
+                    return 0;
+                }
+                int slotIndex = getRangePartition(accessor, tIndex);
+                // Map range partition to node partitions.
+                double rangesPerPart = 1;
+                if (rangeMap.getSplitCount() + 1 > nParts) {
+                    rangesPerPart = ((double) rangeMap.getSplitCount() + 1) / nParts;
+                }
+                return (int) Math.floor(slotIndex / rangesPerPart);
+            }
+
+            /*
+             * Determine the range partition.
+             */
+            public int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
+                int slotIndex = 0;
+                for (int i = 0; i < rangeMap.getSplitCount(); ++i) {
+                    int c = compareSlotAndFields(accessor, tIndex, i);
+                    if (c < 0) {
+                        return slotIndex;
+                    }
+                    slotIndex++;
+                }
+                return slotIndex;
+            }
+
+            public int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int fieldIndex)
+                    throws HyracksDataException {
+                int c = 0;
+                int startOffset = accessor.getTupleStartOffset(tIndex);
+                int slotLength = accessor.getFieldSlotsLength();
+                for (int f = 0; f < comparators.length; ++f) {
+                    int fIdx = rangeFields[f];
+                    int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
+                    int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
+                    c = comparators[f].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart, fEnd
+                            - fStart, rangeMap.getByteArray(fieldIndex, f), rangeMap.getStartOffset(fieldIndex, f),
+                            rangeMap.getLength(fieldIndex, f));
+                    if (c != 0) {
+                        return c;
+                    }
+                }
+                return c;
+            }
+
+        };
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/IRangeMap.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/IRangeMap.java
new file mode 100644
index 0000000..a503146
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/IRangeMap.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.dataflow.common.data.partition.range;
+
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+
+public interface IRangeMap {
+    public IPointable getFieldSplit(int columnIndex, int splitIndex);
+
+    public int getSplitCount();
+
+    public byte[] getByteArray(int columnIndex, int splitIndex);
+
+    public int getStartOffset(int columnIndex, int splitIndex);
+
+    public int getLength(int columnIndex, int splitIndex);
+
+    public int getTag(int columnIndex, int splitIndex);
+}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/RangeMap.java
new file mode 100644
index 0000000..c290d24
--- /dev/null
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -0,0 +1,80 @@
+package edu.uci.ics.hyracks.dataflow.common.data.partition.range;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
+
+/**
+ * The range map stores the field split values in an byte array.
+ * The first split value for each field followed by the second split value for each field, etc.
+ */
+public class RangeMap implements IRangeMap, Serializable {
+    private final int fields;
+    private final byte[] bytes;
+    private final int[] offsets;
+
+    public RangeMap(int fields, byte[] bytes, int[] offsets) {
+        this.fields = fields;
+        this.bytes = bytes;
+        this.offsets = offsets;
+    }
+
+    @Override
+    public IPointable getFieldSplit(int columnIndex, int splitIndex) {
+        IPointable p = VoidPointable.FACTORY.createPointable();
+        int index = getFieldIndex(columnIndex, splitIndex);
+        p.set(bytes, getFieldStart(index), getFieldLength(index));
+        return p;
+    }
+
+    @Override
+    public int getSplitCount() {
+        return offsets.length / fields;
+    }
+
+    @Override
+    public byte[] getByteArray(int columnIndex, int splitIndex) {
+        return bytes;
+    }
+
+    @Override
+    public int getTag(int columnIndex, int splitIndex) {
+        return getFieldTag(getFieldIndex(columnIndex, splitIndex));
+    }
+
+    @Override
+    public int getStartOffset(int columnIndex, int splitIndex) {
+        return getFieldStart(getFieldIndex(columnIndex, splitIndex));
+    }
+
+    @Override
+    public int getLength(int columnIndex, int splitIndex) {
+        return getFieldLength(getFieldIndex(columnIndex, splitIndex));
+    }
+
+    private int getFieldIndex(int columnIndex, int splitIndex) {
+        return splitIndex * fields + columnIndex;
+    }
+
+    private int getFieldTag(int index) {
+        return bytes[getFieldStart(index)];
+    }
+
+    private int getFieldStart(int index) {
+        int start = 0;
+        if (index != 0) {
+            start = offsets[index - 1];
+        }
+        return start;
+    }
+
+    private int getFieldLength(int index) {
+        int length = offsets[index];
+        if (index != 0) {
+            length -= offsets[index - 1];
+        }
+        return length;
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 540c31b..c03f474 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -46,7 +46,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
 import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
-import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerGeneratorFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFamily;
 import edu.uci.ics.hyracks.dataflow.common.io.RunFileReader;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
@@ -404,10 +404,8 @@
                     state.hybridHJ.closeProbe(writer);
 
                     BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
-                    hpcRep0 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf0)
-                            .createPartitioner(0);
-                    hpcRep1 = new RepartitionComputerGeneratorFactory(state.numOfPartitions, hpcf1)
-                            .createPartitioner(0);
+                    hpcRep0 = new RepartitionComputerFamily(state.numOfPartitions, hpcf0).createPartitioner(0);
+                    hpcRep1 = new RepartitionComputerFamily(state.numOfPartitions, hpcf1).createPartitioner(0);
 
                     rPartbuff.clear();
                     for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
@@ -440,10 +438,10 @@
                     long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getFrameSize()) : (ohhj
                             .getProbePartitionSize(pid) / ctx.getFrameSize());
 
-                    LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId()
-                            + ") (pid " + pid + ") - (level " + level + ") - wasReversed " + wasReversed
-                            + " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize + " - MemForJoin "
-                            + (state.memForJoin) + "  - LeftOuter is " + isLeftOuter);
+                    LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
+                            + pid + ") - (level " + level + ") - wasReversed " + wasReversed + " - BuildSize:\t"
+                            + buildPartSize + "\tProbeSize:\t" + probePartSize + " - MemForJoin " + (state.memForJoin)
+                            + "  - LeftOuter is " + isLeftOuter);
 
                     //Apply in-Mem HJ if possible
                     if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)