[NO ISSUE][HYR] Changes to Multi Partition Tests

-User model changes: no
-Storage format changes: no
-Interface changes: no

Details:

-Adds factories that can be used to perform Partition, Following, and Intersect partitioning as well as a set of tests for those factories.

Change-Id: I22cb8e1f4d47e881ab6a655d675dbfac08027764
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4164
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
new file mode 100644
index 0000000..cefd38a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+abstract class AbstractFieldRangePartitionComputerFactory implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RangeMapSupplier rangeMapSupplier;
+
+    private final IBinaryComparatorFactory[] comparatorFactories;
+
+    protected final SourceLocation sourceLoc;
+
+    AbstractFieldRangePartitionComputerFactory(RangeMapSupplier rangeMapSupplier,
+            IBinaryComparatorFactory[] comparatorFactories, SourceLocation sourceLoc) {
+        this.rangeMapSupplier = rangeMapSupplier;
+        this.comparatorFactories = comparatorFactories;
+        this.sourceLoc = sourceLoc;
+    }
+
+    private IBinaryComparator[] createBinaryComparators() {
+        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        return comparators;
+    }
+
+    private abstract class AbstractFieldRangePartitionComputer {
+
+        final IHyracksTaskContext taskContext;
+
+        final RangeMapPartitionComputer rangeMapPartitionComputer;
+
+        private AbstractFieldRangePartitionComputer(IHyracksTaskContext taskContext) {
+            this.taskContext = taskContext;
+            this.rangeMapPartitionComputer = new RangeMapPartitionComputer();
+        }
+
+        public void initialize() throws HyracksDataException {
+            rangeMapPartitionComputer.initialize(taskContext);
+        }
+    }
+
+    abstract class AbstractFieldRangeSinglePartitionComputer extends AbstractFieldRangePartitionComputer
+            implements ITuplePartitionComputer {
+
+        AbstractFieldRangeSinglePartitionComputer(IHyracksTaskContext taskContext) {
+            super(taskContext);
+        }
+
+        @Override
+        public final int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+            return nParts == 1 ? 0 : computePartition(accessor, tIndex, nParts);
+        }
+
+        protected abstract int computePartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                throws HyracksDataException;
+    }
+
+    abstract class AbstractFieldRangeMultiPartitionComputer extends AbstractFieldRangePartitionComputer
+            implements ITupleMultiPartitionComputer {
+
+        private BitSet result;
+
+        AbstractFieldRangeMultiPartitionComputer(IHyracksTaskContext taskContext) {
+            super(taskContext);
+        }
+
+        @Override
+        public void initialize() throws HyracksDataException {
+            super.initialize();
+            if (result == null) {
+                result = new BitSet();
+            }
+        }
+
+        @Override
+        public final BitSet partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                throws HyracksDataException {
+            result.clear();
+            if (nParts == 1) {
+                result.set(0);
+            } else {
+                int pStart = computeStartPartition(accessor, tIndex, nParts);
+                int pEnd = computeEndPartition(accessor, tIndex, nParts);
+                result.set(pStart, pEnd + 1);
+            }
+            return result;
+        }
+
+        protected abstract int computeStartPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                throws HyracksDataException;
+
+        protected abstract int computeEndPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                throws HyracksDataException;
+    }
+
+    final class RangeMapPartitionComputer {
+
+        private RangeMap rangeMap;
+
+        private IBinaryComparator[] comparators;
+
+        protected void initialize(IHyracksTaskContext taskContext) throws HyracksDataException {
+            rangeMap = rangeMapSupplier.getRangeMap(taskContext);
+            if (rangeMap == null) {
+                throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLoc);
+            }
+            if (comparators == null) {
+                comparators = createBinaryComparators();
+            }
+        }
+
+        int partition(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int nParts)
+                throws HyracksDataException {
+            int slotIndex = findRangeMapSlot(accessor, tIndex, rangeFields);
+            return mapRangeMapSlotToPartition(slotIndex, nParts);
+        }
+
+        int exclusivePartition(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int nParts)
+                throws HyracksDataException {
+            int slotIndex = findRangeMapExclusiveSlot(accessor, tIndex, rangeFields);
+            return mapRangeMapSlotToPartition(slotIndex, nParts);
+        }
+
+        private int mapRangeMapSlotToPartition(int slotIndex, int nParts) {
+            // Map range partition to node partitions.
+            double rangesPerPart = 1;
+            if (rangeMap.getSplitCount() + 1 > nParts) {
+                rangesPerPart = ((double) rangeMap.getSplitCount() + 1) / nParts;
+            }
+            return (int) Math.floor(slotIndex / rangesPerPart);
+        }
+
+        private int findRangeMapSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
+                throws HyracksDataException {
+            int slotIndex = 0;
+            for (int slotNumber = 0, n = rangeMap.getSplitCount(); slotNumber < n; ++slotNumber) {
+                int c = compareSlotAndFields(accessor, tIndex, rangeFields, slotNumber);
+                if (c < 0) {
+                    return slotIndex;
+                }
+                slotIndex++;
+            }
+            return slotIndex;
+        }
+
+        private int findRangeMapExclusiveSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
+                throws HyracksDataException {
+            int slotIndex = 0;
+            for (int slotNumber = 0, n = rangeMap.getSplitCount(); slotNumber < n; ++slotNumber) {
+                int c = compareSlotAndFields(accessor, tIndex, rangeFields, slotNumber);
+                if (c <= 0) {
+                    return slotIndex;
+                }
+                slotIndex++;
+            }
+            return slotIndex;
+        }
+
+        private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int slotNumber)
+                throws HyracksDataException {
+            int c = 0;
+            int startOffset = accessor.getTupleStartOffset(tIndex);
+            int slotLength = accessor.getFieldSlotsLength();
+            for (int fieldNum = 0; fieldNum < comparators.length; ++fieldNum) {
+                int fIdx = rangeFields[fieldNum];
+                int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
+                int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
+                c = comparators[fieldNum].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart,
+                        fEnd - fStart, rangeMap.getByteArray(), rangeMap.getStartOffset(fieldNum, slotNumber),
+                        rangeMap.getLength(fieldNum, slotNumber));
+                if (c != 0) {
+                    return c;
+                }
+            }
+            return c;
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactory.java
new file mode 100644
index 0000000..5a4511b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public final class FieldRangeFollowingPartitionComputerFactory extends AbstractFieldRangePartitionComputerFactory
+        implements ITupleMultiPartitionComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] rangeFields;
+
+    public FieldRangeFollowingPartitionComputerFactory(int[] rangeFields,
+            IBinaryComparatorFactory[] comparatorFactories, RangeMapSupplier rangeMapSupplier,
+            SourceLocation sourceLocation) {
+        super(rangeMapSupplier, comparatorFactories, sourceLocation);
+        this.rangeFields = rangeFields;
+    }
+
+    @Override
+    public ITupleMultiPartitionComputer createPartitioner(IHyracksTaskContext taskContext) {
+        return new AbstractFieldRangeMultiPartitionComputer(taskContext) {
+            @Override
+            protected int computeStartPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                    throws HyracksDataException {
+                return rangeMapPartitionComputer.partition(accessor, tIndex, rangeFields, nParts);
+            }
+
+            @Override
+            protected int computeEndPartition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+                return nParts - 1;
+            }
+        };
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactory.java
new file mode 100644
index 0000000..211a69a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public final class FieldRangeIntersectPartitionComputerFactory extends AbstractFieldRangePartitionComputerFactory
+        implements ITupleMultiPartitionComputerFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] startFields;
+
+    private final int[] endFields;
+
+    public FieldRangeIntersectPartitionComputerFactory(int[] startFields, int[] endFields,
+            IBinaryComparatorFactory[] comparatorFactories, RangeMapSupplier rangeMapSupplier,
+            SourceLocation sourceLocation) {
+        super(rangeMapSupplier, comparatorFactories, sourceLocation);
+        this.startFields = startFields;
+        this.endFields = endFields;
+    }
+
+    @Override
+    public ITupleMultiPartitionComputer createPartitioner(IHyracksTaskContext taskContext) {
+        return new AbstractFieldRangeMultiPartitionComputer(taskContext) {
+            @Override
+            protected int computeStartPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                    throws HyracksDataException {
+                return rangeMapPartitionComputer.partition(accessor, tIndex, startFields, nParts);
+            }
+
+            @Override
+            protected int computeEndPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+                    throws HyracksDataException {
+                return rangeMapPartitionComputer.exclusivePartition(accessor, tIndex, endFields, nParts);
+            }
+        };
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index 1831a5f..0816f2d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -20,92 +20,33 @@
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 
-public final class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
+public final class FieldRangePartitionComputerFactory extends AbstractFieldRangePartitionComputerFactory
+        implements ITuplePartitionComputerFactory {
+
     private static final long serialVersionUID = 1L;
+
     private final int[] rangeFields;
-    private final IBinaryComparatorFactory[] comparatorFactories;
-    private final RangeMapSupplier rangeMapSupplier;
-    private final SourceLocation sourceLocation;
 
     public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
             RangeMapSupplier rangeMapSupplier, SourceLocation sourceLocation) {
+        super(rangeMapSupplier, comparatorFactories, sourceLocation);
         this.rangeFields = rangeFields;
-        this.rangeMapSupplier = rangeMapSupplier;
-        this.comparatorFactories = comparatorFactories;
-        this.sourceLocation = sourceLocation;
     }
 
     @Override
     public ITuplePartitionComputer createPartitioner(IHyracksTaskContext taskContext) {
-        final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-        for (int i = 0; i < comparatorFactories.length; ++i) {
-            comparators[i] = comparatorFactories[i].createBinaryComparator();
-        }
-
-        return new ITuplePartitionComputer() {
-            private RangeMap rangeMap;
-
+        return new AbstractFieldRangeSinglePartitionComputer(taskContext) {
             @Override
-            public void initialize() throws HyracksDataException {
-                rangeMap = rangeMapSupplier.getRangeMap(taskContext);
-                if (rangeMap == null) {
-                    throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLocation);
-                }
-            }
-
-            @Override
-            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
-                if (nParts == 1) {
-                    return 0;
-                }
-                int slotIndex = getRangePartition(accessor, tIndex);
-                // Map range partition to node partitions.
-                double rangesPerPart = 1;
-                if (rangeMap.getSplitCount() + 1 > nParts) {
-                    rangesPerPart = ((double) rangeMap.getSplitCount() + 1) / nParts;
-                }
-                return (int) Math.floor(slotIndex / rangesPerPart);
-            }
-
-            private int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
-                int slotIndex = 0;
-                for (int slotNumber = 0; slotNumber < rangeMap.getSplitCount(); ++slotNumber) {
-                    int c = compareSlotAndFields(accessor, tIndex, slotNumber);
-                    if (c < 0) {
-                        return slotIndex;
-                    }
-                    slotIndex++;
-                }
-                return slotIndex;
-            }
-
-            private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int slotNumber)
+            protected int computePartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
                     throws HyracksDataException {
-                int c = 0;
-                int startOffset = accessor.getTupleStartOffset(tIndex);
-                int slotLength = accessor.getFieldSlotsLength();
-                for (int fieldNum = 0; fieldNum < comparators.length; ++fieldNum) {
-                    int fIdx = rangeFields[fieldNum];
-                    int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
-                    int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
-                    c = comparators[fieldNum].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart,
-                            fEnd - fStart, rangeMap.getByteArray(), rangeMap.getStartOffset(fieldNum, slotNumber),
-                            rangeMap.getLength(fieldNum, slotNumber));
-                    if (c != 0) {
-                        return c;
-                    }
-                }
-                return c;
+                return rangeMapPartitionComputer.partition(accessor, tIndex, rangeFields, nParts);
             }
-
         };
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/pom.xml
new file mode 100644
index 0000000..9ffa000
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/pom.xml
@@ -0,0 +1,82 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>hyracks-dataflow-common-test</artifactId>
+
+    <parent>
+        <groupId>org.apache.hyracks</groupId>
+        <artifactId>hyracks-tests</artifactId>
+        <version>0.3.5-SNAPSHOT</version>
+    </parent>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <properties>
+        <root.dir>${basedir}/../../..</root.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-data-std</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-dataflow-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-test-support</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
new file mode 100644
index 0000000..a61cd40
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.accessors.LongBinaryComparatorFactory;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+
+import junit.framework.TestCase;
+
+/**
+ * These tests check the range partitioning types with various interval sizes and range map split points.
+ * For each range type they check the ASCending comparators for intervals with durations of D = 3, and
+ * a range map of the overall range that has been split into N = 4 parts.
+ * the test for the Split type also checks larger intervals and more splits on the range map to make sure it splits
+ * correctly across many partitions, and within single partitions.
+ * <p>
+ * Currently, this test does not support DESCending intervals, and there is no test case that checks which interval
+ * is included in split if the ending point lands on the point between two partitions.
+ * <p>
+ * The map of the partitions, listed as the rangeMap split points in ascending order:
+ * <p>
+ * The following points (X) will be tested for these 4 partitions.
+ * <p>
+ * X  -----------X----------XXX----------X----------XXX----------X------------XXX------------X------------  X
+ * -----------------------|-----------------------|-------------------------|--------------------------
+ * <p>
+ * The following points (X) will be tested for these 16 partitions.
+ * <p>
+ * X  -----------X----------XXX----------X----------XXX----------X------------XXX------------X------------  X
+ * -----|-----|-----|-----|-----|-----|-----|-----|-----|-----|------|------|------|------|------|-----
+ * <p>
+ * N4                0          )[           1          )[           2            )[             3
+ * N16     0  )[  1 )[  2 )[  3 )[  4 )[  5 )[  6 )[  7 )[  8 )[  9 )[  10 )[  11 )[  12 )[  13 )[  14 )[  15
+ * ASC   0     25    50    75    100   125   150   175   200   225   250    275    300    325    350    375    400
+ * <p>
+ * First and last partitions include all values less than and greater than min and max split points respectively.
+ * <p>
+ * Here are the test points inside EACH_PARTITION:
+ * result index {      0,   1,   2,   3,    4,    5,    6,    7,    8,    9,   10,   11,   12,   13,   14,   15   };
+ * points       {     20l, 45l, 70l, 95l, 120l, 145l, 170l, 195l, 220l, 245l, 270l, 295l, 320l, 345l, 370l, 395l  };
+ * <p>
+ * PARTITION_EDGE_CASES: Tests points at or near partition boundaries and at the ends of the partition range.
+ * result index {      0,   1,   2,   3,    4,    5,    6,    7,    8,    9,    10,   11,   12,   13,   14        };
+ * points       {    -25l, 50l, 99l, 100l, 101l, 150l, 199l, 200l, 201l, 250l, 299l, 300l, 301l, 350l, 425l       };
+ * <p>
+ * MAP_POINTS: The map of the partitions, listed as the split points.
+ * partitions   {  0,   1,   2,   3,    4,    5,    6,    7,    8,    9,   10,   11,   12,   13,   14,   15,   16 };
+ * map          { 0l, 25l, 50l, 75l, 100l, 125l, 150l, 175l, 200l, 225l, 250l, 275l, 300l, 325l, 350l, 375l, 400l };
+ * <p>
+ * Both rangeMap partitions and test intervals are end exclusive.
+ * an ascending test interval ending on 200 like (190, 200) is not in partition 8.
+ * <p>
+ * The Following, Intersect, and Partition partitioning is based off of Split, Replicate, Project in
+ * "Processing Interval Joins On Map-Reduce" by Chawda, et. al.
+ */
+public abstract class AbstractFieldRangeMultiPartitionComputerFactoryTest extends TestCase {
+
+    protected static final Long[] EACH_PARTITION =
+            new Long[] { 20l, 45l, 70l, 95l, 120l, 145l, 170l, 195l, 220l, 245l, 270l, 295l, 320l, 345l, 370l, 395l };
+    protected static final Long[] PARTITION_EDGE_CASES =
+            new Long[] { -25l, 50l, 99l, 100l, 101l, 150l, 199l, 200l, 201l, 250l, 299l, 300l, 301l, 350l, 425l };
+    protected static final Long[] MAP_POINTS =
+            new Long[] { 25l, 50l, 75l, 100l, 125l, 150l, 175l, 200l, 225l, 250l, 275l, 300l, 325l, 350l, 375l };
+    private final Integer64SerializerDeserializer integerSerde = Integer64SerializerDeserializer.INSTANCE;
+    @SuppressWarnings("rawtypes")
+    private final ISerializerDeserializer[] twoIntegerSerDers = new ISerializerDeserializer[] {
+            Integer64SerializerDeserializer.INSTANCE, Integer64SerializerDeserializer.INSTANCE };
+    private final RecordDescriptor recordIntegerDesc = new RecordDescriptor(twoIntegerSerDers);
+    private static final int FRAME_SIZE = 640;
+    private static final int INTEGER_LENGTH = Long.BYTES;
+    static final IBinaryComparatorFactory[] BINARY_ASC_COMPARATOR_FACTORIES =
+            new IBinaryComparatorFactory[] { LongBinaryComparatorFactory.INSTANCE };
+    static final IBinaryComparatorFactory[] BINARY_DESC_COMPARATOR_FACTORIES =
+            new IBinaryComparatorFactory[] { LongDescBinaryComparatorFactory.INSTANCE };
+    protected static final int[] START_FIELD = new int[] { 0 };
+    protected static final int[] END_FIELD = new int[] { 1 };
+
+    private byte[] getIntegerBytes(Long[] integers) throws HyracksDataException {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutput dos = new DataOutputStream(bos);
+            for (int i = 0; i < integers.length; ++i) {
+                integerSerde.serialize(integers[i], dos);
+            }
+            bos.close();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    protected RangeMap getIntegerRangeMap(Long[] integers) throws HyracksDataException {
+        int[] offsets = new int[integers.length];
+        for (int i = 0; i < integers.length; ++i) {
+            offsets[i] = (i + 1) * INTEGER_LENGTH;
+        }
+        return new RangeMap(1, getIntegerBytes(integers), offsets);
+    }
+
+    private ByteBuffer prepareData(IHyracksTaskContext ctx, Long[] startPoints, Long duration)
+            throws HyracksDataException {
+        IFrame frame = new VSizeFrame(ctx);
+
+        FrameTupleAppender appender = new FrameTupleAppender();
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(recordIntegerDesc.getFieldCount());
+        DataOutput dos = tb.getDataOutput();
+        appender.reset(frame, true);
+
+        for (int i = 0; i < startPoints.length; ++i) {
+            tb.reset();
+            integerSerde.serialize(startPoints[i], dos);
+            tb.addFieldEndOffset();
+            integerSerde.serialize(startPoints[i] + duration, dos);
+            tb.addFieldEndOffset();
+            appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        }
+        return frame.getBuffer();
+    }
+
+    protected void executeFieldRangeFollowingPartitionTests(Long[] integers, RangeMap rangeMap,
+            IBinaryComparatorFactory[] minComparatorFactories, int nParts, int[][] results, long duration,
+            int[] rangeFields) throws HyracksDataException {
+
+        StaticRangeMapSupplier rangeMapSupplier = new StaticRangeMapSupplier(rangeMap);
+        SourceLocation sourceLocation = new SourceLocation(0, 0);
+
+        ITupleMultiPartitionComputerFactory itmpcf = new FieldRangeFollowingPartitionComputerFactory(rangeFields,
+                minComparatorFactories, rangeMapSupplier, sourceLocation);
+
+        executeFieldRangeMultiPartitionTests(integers, itmpcf, nParts, results, duration);
+
+    }
+
+    protected void executeFieldRangeIntersectPartitionTests(Long[] integers, RangeMap rangeMap,
+            IBinaryComparatorFactory[] minComparatorFactories, int nParts, int[][] results, long duration,
+            int[] startFields, int[] endFields) throws HyracksDataException {
+
+        StaticRangeMapSupplier rangeMapSupplier = new StaticRangeMapSupplier(rangeMap);
+        SourceLocation sourceLocation = new SourceLocation(0, 0);
+
+        ITupleMultiPartitionComputerFactory itmpcf = new FieldRangeIntersectPartitionComputerFactory(startFields,
+                endFields, minComparatorFactories, rangeMapSupplier, sourceLocation);
+
+        executeFieldRangeMultiPartitionTests(integers, itmpcf, nParts, results, duration);
+    }
+
+    protected void executeFieldRangePartitionTests(Long[] integers, RangeMap rangeMap,
+            IBinaryComparatorFactory[] minComparatorFactories, int nParts, int[][] results, long duration,
+            int[] rangeFields) throws HyracksDataException {
+
+        StaticRangeMapSupplier rangeMapSupplier = new StaticRangeMapSupplier(rangeMap);
+        SourceLocation sourceLocation = new SourceLocation(0, 0);
+
+        ITuplePartitionComputerFactory itpcf = new FieldRangePartitionComputerFactory(rangeFields,
+                minComparatorFactories, rangeMapSupplier, sourceLocation);
+
+        executeFieldRangePartitionTests(integers, itpcf, nParts, results, duration);
+
+    }
+
+    protected void executeFieldRangeMultiPartitionTests(Long[] integers, ITupleMultiPartitionComputerFactory itmpcf,
+            int nParts, int[][] results, long duration) throws HyracksDataException {
+        IHyracksTaskContext ctx = TestUtils.create(FRAME_SIZE);
+
+        ITupleMultiPartitionComputer partitioner = itmpcf.createPartitioner(ctx);
+        partitioner.initialize();
+
+        IFrameTupleAccessor accessor = new FrameTupleAccessor(recordIntegerDesc);
+        ByteBuffer buffer = prepareData(ctx, integers, duration);
+        accessor.reset(buffer);
+
+        for (int i = 0; i < results.length; ++i) {
+            BitSet map = partitioner.partition(accessor, i, nParts);
+            checkPartitionResult(results[i], map);
+        }
+    }
+
+    protected void executeFieldRangePartitionTests(Long[] integers, ITuplePartitionComputerFactory itpcf, int nParts,
+            int[][] results, long duration) throws HyracksDataException {
+        IHyracksTaskContext ctx = TestUtils.create(FRAME_SIZE);
+
+        ITuplePartitionComputer partitioner = itpcf.createPartitioner(ctx);
+        partitioner.initialize();
+
+        IFrameTupleAccessor accessor = new FrameTupleAccessor(recordIntegerDesc);
+        ByteBuffer buffer = prepareData(ctx, integers, duration);
+        accessor.reset(buffer);
+
+        int partition;
+
+        for (int i = 0; i < results.length; ++i) {
+            partition = partitioner.partition(accessor, i, nParts);
+            Assert.assertEquals("The partitions do not match for test " + i + ".", results[i][0], partition);
+        }
+    }
+
+    private String getString(int[] results) {
+        String result = "[";
+        for (int i = 0; i < results.length; ++i) {
+            result += results[i];
+            if (i < results.length - 1) {
+                result += ", ";
+            }
+        }
+        result += "]";
+        return result;
+    }
+
+    private String getString(BitSet results) {
+        int count = 0;
+        String result = "[";
+        for (int i = 0; i < results.length(); ++i) {
+            if (results.get(i)) {
+                if (count > 0) {
+                    result += ", ";
+                }
+                result += i;
+                count++;
+            }
+        }
+        result += "]";
+        return result;
+    }
+
+    private void checkPartitionResult(int[] results, BitSet map) {
+        Assert.assertTrue("The number of partitions in the Bitset:(" + map.cardinality() + ") and the results:("
+                + results.length + ") do not match.", results.length == map.cardinality());
+        for (int i = 0; i < results.length; ++i) {
+            Assert.assertTrue("The map partition " + getString(map) + " and the results " + getString(results)
+                    + " do not match. 2.", map.get(results[i]));
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactoryTest.java
new file mode 100644
index 0000000..a49124a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactoryTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Test;
+
+public class FieldRangeFollowingPartitionComputerFactoryTest
+        extends AbstractFieldRangeMultiPartitionComputerFactoryTest {
+
+    @Test
+    public void testFRMPCF_Replicate_ASC_D3_N4_EDGE() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 0, 1, 2, 3 }; // -25:-22
+        results[1] = new int[] { 0, 1, 2, 3 }; //  50:53
+        results[2] = new int[] { 0, 1, 2, 3 }; //  99:102
+        results[3] = new int[] { 1, 2, 3 }; // 100:103
+        results[4] = new int[] { 1, 2, 3 }; // 101:104
+        results[5] = new int[] { 1, 2, 3 }; // 150:153
+        results[6] = new int[] { 1, 2, 3 }; // 199:202
+        results[7] = new int[] { 2, 3 }; // 200:203
+        results[8] = new int[] { 2, 3 }; // 201:204
+        results[9] = new int[] { 2, 3 }; // 250:253
+        results[10] = new int[] { 2, 3 }; // 299:302
+        results[11] = new int[] { 3 }; // 300:303
+        results[12] = new int[] { 3 }; // 301:304
+        results[13] = new int[] { 3 }; // 350:353
+        results[14] = new int[] { 3 }; // 425:428
+
+        RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+
+        executeFieldRangeFollowingPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 4,
+                results, 3, START_FIELD);
+    }
+
+    @Test
+    public void testFRMPCF_Replicate_DESC_D3_N4_EDGE() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 3 }; // -25:22
+        results[1] = new int[] { 3 }; //  50:53
+        results[2] = new int[] { 2, 3 }; //  99:102
+        results[3] = new int[] { 2, 3 }; // 100:103
+        results[4] = new int[] { 2, 3 }; // 101:104
+        results[5] = new int[] { 2, 3 }; // 150:153
+        results[6] = new int[] { 1, 2, 3 }; // 199:202
+        results[7] = new int[] { 1, 2, 3 }; // 200:203
+        results[8] = new int[] { 1, 2, 3 }; // 201:204
+        results[9] = new int[] { 1, 2, 3 }; // 250:253
+        results[10] = new int[] { 0, 1, 2, 3 }; // 299:302
+        results[11] = new int[] { 0, 1, 2, 3 }; // 300:303
+        results[12] = new int[] { 0, 1, 2, 3 }; // 301:304
+        results[13] = new int[] { 0, 1, 2, 3 }; // 350:353
+        results[14] = new int[] { 0, 1, 2, 3 }; // 425:428
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        RangeMap rangeMap = getIntegerRangeMap(map);
+        int[] rangeFields = new int[] { 1 };
+
+        executeFieldRangeFollowingPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 4,
+                results, 3, END_FIELD);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactoryTest.java
new file mode 100644
index 0000000..edf9d7a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactoryTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Test;
+
+public class FieldRangeIntersectPartitionComputerFactoryTest
+        extends AbstractFieldRangeMultiPartitionComputerFactoryTest {
+
+    @Test
+    public void testFRMPCF_Split_ASC_D3_N4_EDGE() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 0 }; // -25:-22
+        results[1] = new int[] { 0 }; //  50:53
+        results[2] = new int[] { 0, 1 }; //  99:102
+        results[3] = new int[] { 1 }; // 100:103
+        results[4] = new int[] { 1 }; // 101:104
+        results[5] = new int[] { 1 }; // 150:153
+        results[6] = new int[] { 1, 2 }; // 199:202
+        results[7] = new int[] { 2 }; // 200:203
+        results[8] = new int[] { 2 }; // 201:204
+        results[9] = new int[] { 2 }; // 250:253
+        results[10] = new int[] { 2, 3 }; // 299:302
+        results[11] = new int[] { 3 }; // 300:303
+        results[12] = new int[] { 3 }; // 301:304
+        results[13] = new int[] { 3 }; // 350:353
+        results[14] = new int[] { 3 }; // 425:428
+
+        RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+
+        executeFieldRangeIntersectPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 4,
+                results, 3, START_FIELD, END_FIELD);
+    }
+
+    @Test
+    public void testFRMPCF_Split_DESC_D3_N4_EDGE() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 3 }; // -25:-22
+        results[1] = new int[] { 3 }; //  50:53
+        results[2] = new int[] { 2, 3 }; //  99:102
+        results[3] = new int[] { 2 }; // 100:103
+        results[4] = new int[] { 2 }; // 101:104
+        results[5] = new int[] { 2 }; // 150:153
+        results[6] = new int[] { 1, 2 }; // 199:202
+        results[7] = new int[] { 1 }; // 200:203
+        results[8] = new int[] { 1 }; // 201:204
+        results[9] = new int[] { 1 }; // 250:253
+        results[10] = new int[] { 0, 1 }; // 299:302
+        results[11] = new int[] { 0 }; // 300:303
+        results[12] = new int[] { 0 }; // 301:304
+        results[13] = new int[] { 0 }; // 350:353
+        results[14] = new int[] { 0 }; // 425:428
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        RangeMap rangeMap = getIntegerRangeMap(map);
+
+        executeFieldRangeIntersectPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 4,
+                results, 3, END_FIELD, START_FIELD);
+    }
+
+    @Test
+    public void testFRMPCF_Split_ASC_D50_N16_EDGE() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 0 }; // -25:25
+        results[1] = new int[] { 2, 3 }; // 50:100
+        results[2] = new int[] { 3, 4, 5 }; // 99:149
+        results[3] = new int[] { 4, 5 }; // 100:150
+        results[4] = new int[] { 4, 5, 6 }; // 101:151
+        results[5] = new int[] { 6, 7 }; // 150:200
+        results[6] = new int[] { 7, 8, 9 }; // 199:249
+        results[7] = new int[] { 8, 9 }; // 200:250
+        results[8] = new int[] { 8, 9, 10 }; // 201:251
+        results[9] = new int[] { 10, 11 }; // 250:300
+        results[10] = new int[] { 11, 12, 13 }; // 299:349
+        results[11] = new int[] { 12, 13 }; // 300:350
+        results[12] = new int[] { 12, 13, 14 }; // 301:351
+        results[13] = new int[] { 14, 15 }; // 350:400
+        results[14] = new int[] { 15 }; // 425:475
+
+        RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+
+        executeFieldRangeIntersectPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 16,
+                results, 50, START_FIELD, END_FIELD);
+    }
+
+    @Test
+    public void testFRMPCF_Split_DESC_D50_N16_EDGE() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 15 }; // -25:25
+        results[1] = new int[] { 12, 13 }; // 50:100
+        results[2] = new int[] { 10, 11, 12 }; // 99:149
+        results[3] = new int[] { 10, 11 }; // 100:150
+        results[4] = new int[] { 9, 10, 11 }; // 101:151
+        results[5] = new int[] { 8, 9 }; // 150:200
+        results[6] = new int[] { 6, 7, 8 }; // 199:249
+        results[7] = new int[] { 6, 7 }; // 200:250
+        results[8] = new int[] { 5, 6, 7 }; // 201:251
+        results[9] = new int[] { 4, 5 }; // 250:300
+        results[10] = new int[] { 2, 3, 4 }; // 299:349
+        results[11] = new int[] { 2, 3 }; // 300:350
+        results[12] = new int[] { 1, 2, 3 }; // 301:351
+        results[13] = new int[] { 0, 1 }; // 350:400
+        results[14] = new int[] { 0 }; // 425:475
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        RangeMap rangeMap = getIntegerRangeMap(map);
+
+        executeFieldRangeIntersectPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 16,
+                results, 50, END_FIELD, START_FIELD);
+    }
+
+    @Test
+    public void testFRMPCF_Split_ASC_D3_N16_EACH() throws HyracksDataException {
+        int[][] results = new int[16][];
+        results[0] = new int[] { 0 }; // 20:23
+        results[1] = new int[] { 1 }; // 45:48
+        results[2] = new int[] { 2 }; // 70:73
+        results[3] = new int[] { 3 }; // 95:98
+        results[4] = new int[] { 4 }; // 120:123
+        results[5] = new int[] { 5 }; // 145:148
+        results[6] = new int[] { 6 }; // 170:173
+        results[7] = new int[] { 7 }; // 195:198
+        results[8] = new int[] { 8 }; // 220:223
+        results[9] = new int[] { 9 }; // 245:248
+        results[10] = new int[] { 10 }; // 270:273
+        results[11] = new int[] { 11 }; // 295:298
+        results[12] = new int[] { 12 }; // 320:323
+        results[13] = new int[] { 13 }; // 345:348
+        results[14] = new int[] { 14 }; // 370:373
+        results[15] = new int[] { 15 }; // 395:398
+
+        RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+
+        executeFieldRangeIntersectPartitionTests(EACH_PARTITION, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 16, results,
+                3, START_FIELD, END_FIELD);
+    }
+
+    @Test
+    public void testFRMPCF_Split_DESC_D3_N16_EACH() throws HyracksDataException {
+        int[][] results = new int[16][];
+        results[0] = new int[] { 15 }; // 20:23
+        results[1] = new int[] { 14 }; // 45:48
+        results[2] = new int[] { 13 }; // 70:73
+        results[3] = new int[] { 12 }; // 95:98
+        results[4] = new int[] { 11 }; // 120:123
+        results[5] = new int[] { 10 }; // 145:148
+        results[6] = new int[] { 9 }; // 170:173
+        results[7] = new int[] { 8 }; // 195:198
+        results[8] = new int[] { 7 }; // 220:223
+        results[9] = new int[] { 6 }; // 245:248
+        results[10] = new int[] { 5 }; // 270:273
+        results[11] = new int[] { 4 }; // 295:298
+        results[12] = new int[] { 3 }; // 320:323
+        results[13] = new int[] { 2 }; // 345:348
+        results[14] = new int[] { 1 }; // 370:373
+        results[15] = new int[] { 0 }; // 395:398
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        RangeMap rangeMap = getIntegerRangeMap(map);
+
+        executeFieldRangeIntersectPartitionTests(EACH_PARTITION, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 16,
+                results, 3, END_FIELD, START_FIELD);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactoryTest.java
new file mode 100644
index 0000000..3efe989
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Test;
+
+public class FieldRangePartitionComputerFactoryTest extends AbstractFieldRangeMultiPartitionComputerFactoryTest {
+
+    @Test
+    public void testFRMPCF_Project_ASC_D3_N4_EDGE() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 0 }; // -25:-22
+        results[1] = new int[] { 0 }; //  50:53
+        results[2] = new int[] { 0 }; //  99:102
+        results[3] = new int[] { 1 }; // 100:103
+        results[4] = new int[] { 1 }; // 101:104
+        results[5] = new int[] { 1 }; // 150:153
+        results[6] = new int[] { 1 }; // 199:202
+        results[7] = new int[] { 2 }; // 200:203
+        results[8] = new int[] { 2 }; // 201:204
+        results[9] = new int[] { 2 }; // 250:253
+        results[10] = new int[] { 2 }; // 299:302
+        results[11] = new int[] { 3 }; // 300:303
+        results[12] = new int[] { 3 }; // 301:304
+        results[13] = new int[] { 3 }; // 350:353
+        results[14] = new int[] { 3 }; // 425:428
+
+        RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+        int[] rangeFields = new int[] { 0 };
+
+        executeFieldRangePartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 4, results, 3,
+                START_FIELD);
+    }
+
+    @Test
+    public void testFRMPCF_Project_DESC_D3_N4_EDGE() throws HyracksDataException {
+        int[][] results = new int[15][];
+        results[0] = new int[] { 3 }; // -25:-22
+        results[1] = new int[] { 3 }; //  50:53
+        results[2] = new int[] { 2 }; //  99:102
+        results[3] = new int[] { 2 }; // 100:103
+        results[4] = new int[] { 2 }; // 101:104
+        results[5] = new int[] { 2 }; // 150:153
+        results[6] = new int[] { 1 }; // 199:202
+        results[7] = new int[] { 1 }; // 200:203
+        results[8] = new int[] { 1 }; // 201:204
+        results[9] = new int[] { 1 }; // 250:253
+        results[10] = new int[] { 0 }; // 299:302
+        results[11] = new int[] { 0 }; // 300:303
+        results[12] = new int[] { 0 }; // 301:304
+        results[13] = new int[] { 0 }; // 350:353
+        results[14] = new int[] { 0 }; // 425:428
+
+        Long[] map = MAP_POINTS.clone();
+        ArrayUtils.reverse(map);
+        RangeMap rangeMap = getIntegerRangeMap(map);
+
+        executeFieldRangePartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 4, results, 3,
+                END_FIELD);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/LongDescBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/LongDescBinaryComparatorFactory.java
new file mode 100644
index 0000000..f72cdcd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/LongDescBinaryComparatorFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import static org.apache.hyracks.data.std.primitive.LongPointable.getLong;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.data.std.util.DataUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public final class LongDescBinaryComparatorFactory implements IBinaryComparatorFactory {
+
+    private static final long serialVersionUID = 1L;
+    public static final LongDescBinaryComparatorFactory INSTANCE = new LongDescBinaryComparatorFactory();
+
+    private LongDescBinaryComparatorFactory() {
+    }
+
+    @Override
+    public IBinaryComparator createBinaryComparator() {
+        return new IBinaryComparator() {
+            @Override
+            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+                DataUtils.ensureLengths(8, l1, l2);
+                return -Long.compare(getLong(b1, s1), getLong(b2, s2));
+            }
+        };
+    }
+
+    @Override
+    public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
+        return registry.getClassIdentifier(getClass(), serialVersionUID);
+    }
+
+    @SuppressWarnings("squid:S1172") // unused parameter
+    public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+        return INSTANCE;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/pom.xml
index 02f050b..d55fa18 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-tests/pom.xml
@@ -51,5 +51,6 @@
     <module>hyracks-storage-am-lsm-rtree-test</module>
     <module>hyracks-storage-am-lsm-invertedindex-test</module>
     <module>hyracks-storage-am-bloomfilter-test</module>
+    <module>hyracks-dataflow-common-test</module>
   </modules>
 </project>