[NO ISSUE][HYR] Changes to Multi Partition Tests
-User model changes: no
-Storage format changes: no
-Interface changes: no
Details:
-Adds factories that can be used to perform Partition, Following, and Intersect partitioning as well as a set of tests for those factories.
Change-Id: I22cb8e1f4d47e881ab6a655d675dbfac08027764
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4164
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
new file mode 100644
index 0000000..cefd38a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+abstract class AbstractFieldRangePartitionComputerFactory implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final RangeMapSupplier rangeMapSupplier;
+
+ private final IBinaryComparatorFactory[] comparatorFactories;
+
+ protected final SourceLocation sourceLoc;
+
+ AbstractFieldRangePartitionComputerFactory(RangeMapSupplier rangeMapSupplier,
+ IBinaryComparatorFactory[] comparatorFactories, SourceLocation sourceLoc) {
+ this.rangeMapSupplier = rangeMapSupplier;
+ this.comparatorFactories = comparatorFactories;
+ this.sourceLoc = sourceLoc;
+ }
+
+ private IBinaryComparator[] createBinaryComparators() {
+ final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+ for (int i = 0; i < comparatorFactories.length; ++i) {
+ comparators[i] = comparatorFactories[i].createBinaryComparator();
+ }
+ return comparators;
+ }
+
+ private abstract class AbstractFieldRangePartitionComputer {
+
+ final IHyracksTaskContext taskContext;
+
+ final RangeMapPartitionComputer rangeMapPartitionComputer;
+
+ private AbstractFieldRangePartitionComputer(IHyracksTaskContext taskContext) {
+ this.taskContext = taskContext;
+ this.rangeMapPartitionComputer = new RangeMapPartitionComputer();
+ }
+
+ public void initialize() throws HyracksDataException {
+ rangeMapPartitionComputer.initialize(taskContext);
+ }
+ }
+
+ abstract class AbstractFieldRangeSinglePartitionComputer extends AbstractFieldRangePartitionComputer
+ implements ITuplePartitionComputer {
+
+ AbstractFieldRangeSinglePartitionComputer(IHyracksTaskContext taskContext) {
+ super(taskContext);
+ }
+
+ @Override
+ public final int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
+ return nParts == 1 ? 0 : computePartition(accessor, tIndex, nParts);
+ }
+
+ protected abstract int computePartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException;
+ }
+
+ abstract class AbstractFieldRangeMultiPartitionComputer extends AbstractFieldRangePartitionComputer
+ implements ITupleMultiPartitionComputer {
+
+ private BitSet result;
+
+ AbstractFieldRangeMultiPartitionComputer(IHyracksTaskContext taskContext) {
+ super(taskContext);
+ }
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ super.initialize();
+ if (result == null) {
+ result = new BitSet();
+ }
+ }
+
+ @Override
+ public final BitSet partition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException {
+ result.clear();
+ if (nParts == 1) {
+ result.set(0);
+ } else {
+ int pStart = computeStartPartition(accessor, tIndex, nParts);
+ int pEnd = computeEndPartition(accessor, tIndex, nParts);
+ result.set(pStart, pEnd + 1);
+ }
+ return result;
+ }
+
+ protected abstract int computeStartPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException;
+
+ protected abstract int computeEndPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException;
+ }
+
+ final class RangeMapPartitionComputer {
+
+ private RangeMap rangeMap;
+
+ private IBinaryComparator[] comparators;
+
+ protected void initialize(IHyracksTaskContext taskContext) throws HyracksDataException {
+ rangeMap = rangeMapSupplier.getRangeMap(taskContext);
+ if (rangeMap == null) {
+ throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLoc);
+ }
+ if (comparators == null) {
+ comparators = createBinaryComparators();
+ }
+ }
+
+ int partition(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int nParts)
+ throws HyracksDataException {
+ int slotIndex = findRangeMapSlot(accessor, tIndex, rangeFields);
+ return mapRangeMapSlotToPartition(slotIndex, nParts);
+ }
+
+ int exclusivePartition(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int nParts)
+ throws HyracksDataException {
+ int slotIndex = findRangeMapExclusiveSlot(accessor, tIndex, rangeFields);
+ return mapRangeMapSlotToPartition(slotIndex, nParts);
+ }
+
+ private int mapRangeMapSlotToPartition(int slotIndex, int nParts) {
+ // Map range partition to node partitions.
+ double rangesPerPart = 1;
+ if (rangeMap.getSplitCount() + 1 > nParts) {
+ rangesPerPart = ((double) rangeMap.getSplitCount() + 1) / nParts;
+ }
+ return (int) Math.floor(slotIndex / rangesPerPart);
+ }
+
+ private int findRangeMapSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
+ throws HyracksDataException {
+ int slotIndex = 0;
+ for (int slotNumber = 0, n = rangeMap.getSplitCount(); slotNumber < n; ++slotNumber) {
+ int c = compareSlotAndFields(accessor, tIndex, rangeFields, slotNumber);
+ if (c < 0) {
+ return slotIndex;
+ }
+ slotIndex++;
+ }
+ return slotIndex;
+ }
+
+ private int findRangeMapExclusiveSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
+ throws HyracksDataException {
+ int slotIndex = 0;
+ for (int slotNumber = 0, n = rangeMap.getSplitCount(); slotNumber < n; ++slotNumber) {
+ int c = compareSlotAndFields(accessor, tIndex, rangeFields, slotNumber);
+ if (c <= 0) {
+ return slotIndex;
+ }
+ slotIndex++;
+ }
+ return slotIndex;
+ }
+
+ private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int slotNumber)
+ throws HyracksDataException {
+ int c = 0;
+ int startOffset = accessor.getTupleStartOffset(tIndex);
+ int slotLength = accessor.getFieldSlotsLength();
+ for (int fieldNum = 0; fieldNum < comparators.length; ++fieldNum) {
+ int fIdx = rangeFields[fieldNum];
+ int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
+ int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
+ c = comparators[fieldNum].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart,
+ fEnd - fStart, rangeMap.getByteArray(), rangeMap.getStartOffset(fieldNum, slotNumber),
+ rangeMap.getLength(fieldNum, slotNumber));
+ if (c != 0) {
+ return c;
+ }
+ }
+ return c;
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactory.java
new file mode 100644
index 0000000..5a4511b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public final class FieldRangeFollowingPartitionComputerFactory extends AbstractFieldRangePartitionComputerFactory
+ implements ITupleMultiPartitionComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] rangeFields;
+
+ public FieldRangeFollowingPartitionComputerFactory(int[] rangeFields,
+ IBinaryComparatorFactory[] comparatorFactories, RangeMapSupplier rangeMapSupplier,
+ SourceLocation sourceLocation) {
+ super(rangeMapSupplier, comparatorFactories, sourceLocation);
+ this.rangeFields = rangeFields;
+ }
+
+ @Override
+ public ITupleMultiPartitionComputer createPartitioner(IHyracksTaskContext taskContext) {
+ return new AbstractFieldRangeMultiPartitionComputer(taskContext) {
+ @Override
+ protected int computeStartPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException {
+ return rangeMapPartitionComputer.partition(accessor, tIndex, rangeFields, nParts);
+ }
+
+ @Override
+ protected int computeEndPartition(IFrameTupleAccessor accessor, int tIndex, int nParts) {
+ return nParts - 1;
+ }
+ };
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactory.java
new file mode 100644
index 0000000..211a69a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+public final class FieldRangeIntersectPartitionComputerFactory extends AbstractFieldRangePartitionComputerFactory
+ implements ITupleMultiPartitionComputerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] startFields;
+
+ private final int[] endFields;
+
+ public FieldRangeIntersectPartitionComputerFactory(int[] startFields, int[] endFields,
+ IBinaryComparatorFactory[] comparatorFactories, RangeMapSupplier rangeMapSupplier,
+ SourceLocation sourceLocation) {
+ super(rangeMapSupplier, comparatorFactories, sourceLocation);
+ this.startFields = startFields;
+ this.endFields = endFields;
+ }
+
+ @Override
+ public ITupleMultiPartitionComputer createPartitioner(IHyracksTaskContext taskContext) {
+ return new AbstractFieldRangeMultiPartitionComputer(taskContext) {
+ @Override
+ protected int computeStartPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException {
+ return rangeMapPartitionComputer.partition(accessor, tIndex, startFields, nParts);
+ }
+
+ @Override
+ protected int computeEndPartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
+ throws HyracksDataException {
+ return rangeMapPartitionComputer.exclusivePartition(accessor, tIndex, endFields, nParts);
+ }
+ };
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index 1831a5f..0816f2d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -20,92 +20,33 @@
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
-public final class FieldRangePartitionComputerFactory implements ITuplePartitionComputerFactory {
+public final class FieldRangePartitionComputerFactory extends AbstractFieldRangePartitionComputerFactory
+ implements ITuplePartitionComputerFactory {
+
private static final long serialVersionUID = 1L;
+
private final int[] rangeFields;
- private final IBinaryComparatorFactory[] comparatorFactories;
- private final RangeMapSupplier rangeMapSupplier;
- private final SourceLocation sourceLocation;
public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
RangeMapSupplier rangeMapSupplier, SourceLocation sourceLocation) {
+ super(rangeMapSupplier, comparatorFactories, sourceLocation);
this.rangeFields = rangeFields;
- this.rangeMapSupplier = rangeMapSupplier;
- this.comparatorFactories = comparatorFactories;
- this.sourceLocation = sourceLocation;
}
@Override
public ITuplePartitionComputer createPartitioner(IHyracksTaskContext taskContext) {
- final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
- for (int i = 0; i < comparatorFactories.length; ++i) {
- comparators[i] = comparatorFactories[i].createBinaryComparator();
- }
-
- return new ITuplePartitionComputer() {
- private RangeMap rangeMap;
-
+ return new AbstractFieldRangeSinglePartitionComputer(taskContext) {
@Override
- public void initialize() throws HyracksDataException {
- rangeMap = rangeMapSupplier.getRangeMap(taskContext);
- if (rangeMap == null) {
- throw HyracksDataException.create(ErrorCode.RANGEMAP_NOT_FOUND, sourceLocation);
- }
- }
-
- @Override
- public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
- if (nParts == 1) {
- return 0;
- }
- int slotIndex = getRangePartition(accessor, tIndex);
- // Map range partition to node partitions.
- double rangesPerPart = 1;
- if (rangeMap.getSplitCount() + 1 > nParts) {
- rangesPerPart = ((double) rangeMap.getSplitCount() + 1) / nParts;
- }
- return (int) Math.floor(slotIndex / rangesPerPart);
- }
-
- private int getRangePartition(IFrameTupleAccessor accessor, int tIndex) throws HyracksDataException {
- int slotIndex = 0;
- for (int slotNumber = 0; slotNumber < rangeMap.getSplitCount(); ++slotNumber) {
- int c = compareSlotAndFields(accessor, tIndex, slotNumber);
- if (c < 0) {
- return slotIndex;
- }
- slotIndex++;
- }
- return slotIndex;
- }
-
- private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int slotNumber)
+ protected int computePartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
throws HyracksDataException {
- int c = 0;
- int startOffset = accessor.getTupleStartOffset(tIndex);
- int slotLength = accessor.getFieldSlotsLength();
- for (int fieldNum = 0; fieldNum < comparators.length; ++fieldNum) {
- int fIdx = rangeFields[fieldNum];
- int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
- int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
- c = comparators[fieldNum].compare(accessor.getBuffer().array(), startOffset + slotLength + fStart,
- fEnd - fStart, rangeMap.getByteArray(), rangeMap.getStartOffset(fieldNum, slotNumber),
- rangeMap.getLength(fieldNum, slotNumber));
- if (c != 0) {
- return c;
- }
- }
- return c;
+ return rangeMapPartitionComputer.partition(accessor, tIndex, rangeFields, nParts);
}
-
};
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/pom.xml
new file mode 100644
index 0000000..9ffa000
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/pom.xml
@@ -0,0 +1,82 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-dataflow-common-test</artifactId>
+
+ <parent>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-tests</artifactId>
+ <version>0.3.5-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <properties>
+ <root.dir>${basedir}/../../..</root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-data-std</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-test-support</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
new file mode 100644
index 0000000..a61cd40
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleMultiPartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.accessors.LongBinaryComparatorFactory;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.Assert;
+
+import junit.framework.TestCase;
+
+/**
+ * These tests check the range partitioning types with various interval sizes and range map split points.
+ * For each range type they check the ASCending comparators for intervals with durations of D = 3, and
+ * a range map of the overall range that has been split into N = 4 parts.
+ * the test for the Split type also checks larger intervals and more splits on the range map to make sure it splits
+ * correctly across many partitions, and within single partitions.
+ * <p>
+ * Currently, this test does not support DESCending intervals, and there is no test case that checks which interval
+ * is included in split if the ending point lands on the point between two partitions.
+ * <p>
+ * The map of the partitions, listed as the rangeMap split points in ascending order:
+ * <p>
+ * The following points (X) will be tested for these 4 partitions.
+ * <p>
+ * X -----------X----------XXX----------X----------XXX----------X------------XXX------------X------------ X
+ * -----------------------|-----------------------|-------------------------|--------------------------
+ * <p>
+ * The following points (X) will be tested for these 16 partitions.
+ * <p>
+ * X -----------X----------XXX----------X----------XXX----------X------------XXX------------X------------ X
+ * -----|-----|-----|-----|-----|-----|-----|-----|-----|-----|------|------|------|------|------|-----
+ * <p>
+ * N4 0 )[ 1 )[ 2 )[ 3
+ * N16 0 )[ 1 )[ 2 )[ 3 )[ 4 )[ 5 )[ 6 )[ 7 )[ 8 )[ 9 )[ 10 )[ 11 )[ 12 )[ 13 )[ 14 )[ 15
+ * ASC 0 25 50 75 100 125 150 175 200 225 250 275 300 325 350 375 400
+ * <p>
+ * First and last partitions include all values less than and greater than min and max split points respectively.
+ * <p>
+ * Here are the test points inside EACH_PARTITION:
+ * result index { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
+ * points { 20l, 45l, 70l, 95l, 120l, 145l, 170l, 195l, 220l, 245l, 270l, 295l, 320l, 345l, 370l, 395l };
+ * <p>
+ * PARTITION_EDGE_CASES: Tests points at or near partition boundaries and at the ends of the partition range.
+ * result index { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14 };
+ * points { -25l, 50l, 99l, 100l, 101l, 150l, 199l, 200l, 201l, 250l, 299l, 300l, 301l, 350l, 425l };
+ * <p>
+ * MAP_POINTS: The map of the partitions, listed as the split points.
+ * partitions { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 };
+ * map { 0l, 25l, 50l, 75l, 100l, 125l, 150l, 175l, 200l, 225l, 250l, 275l, 300l, 325l, 350l, 375l, 400l };
+ * <p>
+ * Both rangeMap partitions and test intervals are end exclusive.
+ * an ascending test interval ending on 200 like (190, 200) is not in partition 8.
+ * <p>
+ * The Following, Intersect, and Partition partitioning is based off of Split, Replicate, Project in
+ * "Processing Interval Joins On Map-Reduce" by Chawda, et. al.
+ */
+public abstract class AbstractFieldRangeMultiPartitionComputerFactoryTest extends TestCase {
+
+ protected static final Long[] EACH_PARTITION =
+ new Long[] { 20l, 45l, 70l, 95l, 120l, 145l, 170l, 195l, 220l, 245l, 270l, 295l, 320l, 345l, 370l, 395l };
+ protected static final Long[] PARTITION_EDGE_CASES =
+ new Long[] { -25l, 50l, 99l, 100l, 101l, 150l, 199l, 200l, 201l, 250l, 299l, 300l, 301l, 350l, 425l };
+ protected static final Long[] MAP_POINTS =
+ new Long[] { 25l, 50l, 75l, 100l, 125l, 150l, 175l, 200l, 225l, 250l, 275l, 300l, 325l, 350l, 375l };
+ private final Integer64SerializerDeserializer integerSerde = Integer64SerializerDeserializer.INSTANCE;
+ @SuppressWarnings("rawtypes")
+ private final ISerializerDeserializer[] twoIntegerSerDers = new ISerializerDeserializer[] {
+ Integer64SerializerDeserializer.INSTANCE, Integer64SerializerDeserializer.INSTANCE };
+ private final RecordDescriptor recordIntegerDesc = new RecordDescriptor(twoIntegerSerDers);
+ private static final int FRAME_SIZE = 640;
+ private static final int INTEGER_LENGTH = Long.BYTES;
+ static final IBinaryComparatorFactory[] BINARY_ASC_COMPARATOR_FACTORIES =
+ new IBinaryComparatorFactory[] { LongBinaryComparatorFactory.INSTANCE };
+ static final IBinaryComparatorFactory[] BINARY_DESC_COMPARATOR_FACTORIES =
+ new IBinaryComparatorFactory[] { LongDescBinaryComparatorFactory.INSTANCE };
+ protected static final int[] START_FIELD = new int[] { 0 };
+ protected static final int[] END_FIELD = new int[] { 1 };
+
+ private byte[] getIntegerBytes(Long[] integers) throws HyracksDataException {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutput dos = new DataOutputStream(bos);
+ for (int i = 0; i < integers.length; ++i) {
+ integerSerde.serialize(integers[i], dos);
+ }
+ bos.close();
+ return bos.toByteArray();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ protected RangeMap getIntegerRangeMap(Long[] integers) throws HyracksDataException {
+ int[] offsets = new int[integers.length];
+ for (int i = 0; i < integers.length; ++i) {
+ offsets[i] = (i + 1) * INTEGER_LENGTH;
+ }
+ return new RangeMap(1, getIntegerBytes(integers), offsets);
+ }
+
+ private ByteBuffer prepareData(IHyracksTaskContext ctx, Long[] startPoints, Long duration)
+ throws HyracksDataException {
+ IFrame frame = new VSizeFrame(ctx);
+
+ FrameTupleAppender appender = new FrameTupleAppender();
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(recordIntegerDesc.getFieldCount());
+ DataOutput dos = tb.getDataOutput();
+ appender.reset(frame, true);
+
+ for (int i = 0; i < startPoints.length; ++i) {
+ tb.reset();
+ integerSerde.serialize(startPoints[i], dos);
+ tb.addFieldEndOffset();
+ integerSerde.serialize(startPoints[i] + duration, dos);
+ tb.addFieldEndOffset();
+ appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+ }
+ return frame.getBuffer();
+ }
+
+ protected void executeFieldRangeFollowingPartitionTests(Long[] integers, RangeMap rangeMap,
+ IBinaryComparatorFactory[] minComparatorFactories, int nParts, int[][] results, long duration,
+ int[] rangeFields) throws HyracksDataException {
+
+ StaticRangeMapSupplier rangeMapSupplier = new StaticRangeMapSupplier(rangeMap);
+ SourceLocation sourceLocation = new SourceLocation(0, 0);
+
+ ITupleMultiPartitionComputerFactory itmpcf = new FieldRangeFollowingPartitionComputerFactory(rangeFields,
+ minComparatorFactories, rangeMapSupplier, sourceLocation);
+
+ executeFieldRangeMultiPartitionTests(integers, itmpcf, nParts, results, duration);
+
+ }
+
+ protected void executeFieldRangeIntersectPartitionTests(Long[] integers, RangeMap rangeMap,
+ IBinaryComparatorFactory[] minComparatorFactories, int nParts, int[][] results, long duration,
+ int[] startFields, int[] endFields) throws HyracksDataException {
+
+ StaticRangeMapSupplier rangeMapSupplier = new StaticRangeMapSupplier(rangeMap);
+ SourceLocation sourceLocation = new SourceLocation(0, 0);
+
+ ITupleMultiPartitionComputerFactory itmpcf = new FieldRangeIntersectPartitionComputerFactory(startFields,
+ endFields, minComparatorFactories, rangeMapSupplier, sourceLocation);
+
+ executeFieldRangeMultiPartitionTests(integers, itmpcf, nParts, results, duration);
+ }
+
+ protected void executeFieldRangePartitionTests(Long[] integers, RangeMap rangeMap,
+ IBinaryComparatorFactory[] minComparatorFactories, int nParts, int[][] results, long duration,
+ int[] rangeFields) throws HyracksDataException {
+
+ StaticRangeMapSupplier rangeMapSupplier = new StaticRangeMapSupplier(rangeMap);
+ SourceLocation sourceLocation = new SourceLocation(0, 0);
+
+ ITuplePartitionComputerFactory itpcf = new FieldRangePartitionComputerFactory(rangeFields,
+ minComparatorFactories, rangeMapSupplier, sourceLocation);
+
+ executeFieldRangePartitionTests(integers, itpcf, nParts, results, duration);
+
+ }
+
+ protected void executeFieldRangeMultiPartitionTests(Long[] integers, ITupleMultiPartitionComputerFactory itmpcf,
+ int nParts, int[][] results, long duration) throws HyracksDataException {
+ IHyracksTaskContext ctx = TestUtils.create(FRAME_SIZE);
+
+ ITupleMultiPartitionComputer partitioner = itmpcf.createPartitioner(ctx);
+ partitioner.initialize();
+
+ IFrameTupleAccessor accessor = new FrameTupleAccessor(recordIntegerDesc);
+ ByteBuffer buffer = prepareData(ctx, integers, duration);
+ accessor.reset(buffer);
+
+ for (int i = 0; i < results.length; ++i) {
+ BitSet map = partitioner.partition(accessor, i, nParts);
+ checkPartitionResult(results[i], map);
+ }
+ }
+
+ protected void executeFieldRangePartitionTests(Long[] integers, ITuplePartitionComputerFactory itpcf, int nParts,
+ int[][] results, long duration) throws HyracksDataException {
+ IHyracksTaskContext ctx = TestUtils.create(FRAME_SIZE);
+
+ ITuplePartitionComputer partitioner = itpcf.createPartitioner(ctx);
+ partitioner.initialize();
+
+ IFrameTupleAccessor accessor = new FrameTupleAccessor(recordIntegerDesc);
+ ByteBuffer buffer = prepareData(ctx, integers, duration);
+ accessor.reset(buffer);
+
+ int partition;
+
+ for (int i = 0; i < results.length; ++i) {
+ partition = partitioner.partition(accessor, i, nParts);
+ Assert.assertEquals("The partitions do not match for test " + i + ".", results[i][0], partition);
+ }
+ }
+
+ private String getString(int[] results) {
+ String result = "[";
+ for (int i = 0; i < results.length; ++i) {
+ result += results[i];
+ if (i < results.length - 1) {
+ result += ", ";
+ }
+ }
+ result += "]";
+ return result;
+ }
+
+ private String getString(BitSet results) {
+ int count = 0;
+ String result = "[";
+ for (int i = 0; i < results.length(); ++i) {
+ if (results.get(i)) {
+ if (count > 0) {
+ result += ", ";
+ }
+ result += i;
+ count++;
+ }
+ }
+ result += "]";
+ return result;
+ }
+
+ private void checkPartitionResult(int[] results, BitSet map) {
+ Assert.assertTrue("The number of partitions in the Bitset:(" + map.cardinality() + ") and the results:("
+ + results.length + ") do not match.", results.length == map.cardinality());
+ for (int i = 0; i < results.length; ++i) {
+ Assert.assertTrue("The map partition " + getString(map) + " and the results " + getString(results)
+ + " do not match. 2.", map.get(results[i]));
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactoryTest.java
new file mode 100644
index 0000000..a49124a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeFollowingPartitionComputerFactoryTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Test;
+
+public class FieldRangeFollowingPartitionComputerFactoryTest
+ extends AbstractFieldRangeMultiPartitionComputerFactoryTest {
+
+ @Test
+ public void testFRMPCF_Replicate_ASC_D3_N4_EDGE() throws HyracksDataException {
+ int[][] results = new int[15][];
+ results[0] = new int[] { 0, 1, 2, 3 }; // -25:-22
+ results[1] = new int[] { 0, 1, 2, 3 }; // 50:53
+ results[2] = new int[] { 0, 1, 2, 3 }; // 99:102
+ results[3] = new int[] { 1, 2, 3 }; // 100:103
+ results[4] = new int[] { 1, 2, 3 }; // 101:104
+ results[5] = new int[] { 1, 2, 3 }; // 150:153
+ results[6] = new int[] { 1, 2, 3 }; // 199:202
+ results[7] = new int[] { 2, 3 }; // 200:203
+ results[8] = new int[] { 2, 3 }; // 201:204
+ results[9] = new int[] { 2, 3 }; // 250:253
+ results[10] = new int[] { 2, 3 }; // 299:302
+ results[11] = new int[] { 3 }; // 300:303
+ results[12] = new int[] { 3 }; // 301:304
+ results[13] = new int[] { 3 }; // 350:353
+ results[14] = new int[] { 3 }; // 425:428
+
+ RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+
+ executeFieldRangeFollowingPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 4,
+ results, 3, START_FIELD);
+ }
+
+ @Test
+ public void testFRMPCF_Replicate_DESC_D3_N4_EDGE() throws HyracksDataException {
+ int[][] results = new int[15][];
+ results[0] = new int[] { 3 }; // -25:22
+ results[1] = new int[] { 3 }; // 50:53
+ results[2] = new int[] { 2, 3 }; // 99:102
+ results[3] = new int[] { 2, 3 }; // 100:103
+ results[4] = new int[] { 2, 3 }; // 101:104
+ results[5] = new int[] { 2, 3 }; // 150:153
+ results[6] = new int[] { 1, 2, 3 }; // 199:202
+ results[7] = new int[] { 1, 2, 3 }; // 200:203
+ results[8] = new int[] { 1, 2, 3 }; // 201:204
+ results[9] = new int[] { 1, 2, 3 }; // 250:253
+ results[10] = new int[] { 0, 1, 2, 3 }; // 299:302
+ results[11] = new int[] { 0, 1, 2, 3 }; // 300:303
+ results[12] = new int[] { 0, 1, 2, 3 }; // 301:304
+ results[13] = new int[] { 0, 1, 2, 3 }; // 350:353
+ results[14] = new int[] { 0, 1, 2, 3 }; // 425:428
+
+ Long[] map = MAP_POINTS.clone();
+ ArrayUtils.reverse(map);
+ RangeMap rangeMap = getIntegerRangeMap(map);
+ int[] rangeFields = new int[] { 1 };
+
+ executeFieldRangeFollowingPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 4,
+ results, 3, END_FIELD);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactoryTest.java
new file mode 100644
index 0000000..edf9d7a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangeIntersectPartitionComputerFactoryTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Test;
+
+public class FieldRangeIntersectPartitionComputerFactoryTest
+ extends AbstractFieldRangeMultiPartitionComputerFactoryTest {
+
+ @Test
+ public void testFRMPCF_Split_ASC_D3_N4_EDGE() throws HyracksDataException {
+ int[][] results = new int[15][];
+ results[0] = new int[] { 0 }; // -25:-22
+ results[1] = new int[] { 0 }; // 50:53
+ results[2] = new int[] { 0, 1 }; // 99:102
+ results[3] = new int[] { 1 }; // 100:103
+ results[4] = new int[] { 1 }; // 101:104
+ results[5] = new int[] { 1 }; // 150:153
+ results[6] = new int[] { 1, 2 }; // 199:202
+ results[7] = new int[] { 2 }; // 200:203
+ results[8] = new int[] { 2 }; // 201:204
+ results[9] = new int[] { 2 }; // 250:253
+ results[10] = new int[] { 2, 3 }; // 299:302
+ results[11] = new int[] { 3 }; // 300:303
+ results[12] = new int[] { 3 }; // 301:304
+ results[13] = new int[] { 3 }; // 350:353
+ results[14] = new int[] { 3 }; // 425:428
+
+ RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+
+ executeFieldRangeIntersectPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 4,
+ results, 3, START_FIELD, END_FIELD);
+ }
+
+ @Test
+ public void testFRMPCF_Split_DESC_D3_N4_EDGE() throws HyracksDataException {
+ int[][] results = new int[15][];
+ results[0] = new int[] { 3 }; // -25:-22
+ results[1] = new int[] { 3 }; // 50:53
+ results[2] = new int[] { 2, 3 }; // 99:102
+ results[3] = new int[] { 2 }; // 100:103
+ results[4] = new int[] { 2 }; // 101:104
+ results[5] = new int[] { 2 }; // 150:153
+ results[6] = new int[] { 1, 2 }; // 199:202
+ results[7] = new int[] { 1 }; // 200:203
+ results[8] = new int[] { 1 }; // 201:204
+ results[9] = new int[] { 1 }; // 250:253
+ results[10] = new int[] { 0, 1 }; // 299:302
+ results[11] = new int[] { 0 }; // 300:303
+ results[12] = new int[] { 0 }; // 301:304
+ results[13] = new int[] { 0 }; // 350:353
+ results[14] = new int[] { 0 }; // 425:428
+
+ Long[] map = MAP_POINTS.clone();
+ ArrayUtils.reverse(map);
+ RangeMap rangeMap = getIntegerRangeMap(map);
+
+ executeFieldRangeIntersectPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 4,
+ results, 3, END_FIELD, START_FIELD);
+ }
+
+ @Test
+ public void testFRMPCF_Split_ASC_D50_N16_EDGE() throws HyracksDataException {
+ int[][] results = new int[15][];
+ results[0] = new int[] { 0 }; // -25:25
+ results[1] = new int[] { 2, 3 }; // 50:100
+ results[2] = new int[] { 3, 4, 5 }; // 99:149
+ results[3] = new int[] { 4, 5 }; // 100:150
+ results[4] = new int[] { 4, 5, 6 }; // 101:151
+ results[5] = new int[] { 6, 7 }; // 150:200
+ results[6] = new int[] { 7, 8, 9 }; // 199:249
+ results[7] = new int[] { 8, 9 }; // 200:250
+ results[8] = new int[] { 8, 9, 10 }; // 201:251
+ results[9] = new int[] { 10, 11 }; // 250:300
+ results[10] = new int[] { 11, 12, 13 }; // 299:349
+ results[11] = new int[] { 12, 13 }; // 300:350
+ results[12] = new int[] { 12, 13, 14 }; // 301:351
+ results[13] = new int[] { 14, 15 }; // 350:400
+ results[14] = new int[] { 15 }; // 425:475
+
+ RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+
+ executeFieldRangeIntersectPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 16,
+ results, 50, START_FIELD, END_FIELD);
+ }
+
+ @Test
+ public void testFRMPCF_Split_DESC_D50_N16_EDGE() throws HyracksDataException {
+ int[][] results = new int[15][];
+ results[0] = new int[] { 15 }; // -25:25
+ results[1] = new int[] { 12, 13 }; // 50:100
+ results[2] = new int[] { 10, 11, 12 }; // 99:149
+ results[3] = new int[] { 10, 11 }; // 100:150
+ results[4] = new int[] { 9, 10, 11 }; // 101:151
+ results[5] = new int[] { 8, 9 }; // 150:200
+ results[6] = new int[] { 6, 7, 8 }; // 199:249
+ results[7] = new int[] { 6, 7 }; // 200:250
+ results[8] = new int[] { 5, 6, 7 }; // 201:251
+ results[9] = new int[] { 4, 5 }; // 250:300
+ results[10] = new int[] { 2, 3, 4 }; // 299:349
+ results[11] = new int[] { 2, 3 }; // 300:350
+ results[12] = new int[] { 1, 2, 3 }; // 301:351
+ results[13] = new int[] { 0, 1 }; // 350:400
+ results[14] = new int[] { 0 }; // 425:475
+
+ Long[] map = MAP_POINTS.clone();
+ ArrayUtils.reverse(map);
+ RangeMap rangeMap = getIntegerRangeMap(map);
+
+ executeFieldRangeIntersectPartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 16,
+ results, 50, END_FIELD, START_FIELD);
+ }
+
+ @Test
+ public void testFRMPCF_Split_ASC_D3_N16_EACH() throws HyracksDataException {
+ int[][] results = new int[16][];
+ results[0] = new int[] { 0 }; // 20:23
+ results[1] = new int[] { 1 }; // 45:48
+ results[2] = new int[] { 2 }; // 70:73
+ results[3] = new int[] { 3 }; // 95:98
+ results[4] = new int[] { 4 }; // 120:123
+ results[5] = new int[] { 5 }; // 145:148
+ results[6] = new int[] { 6 }; // 170:173
+ results[7] = new int[] { 7 }; // 195:198
+ results[8] = new int[] { 8 }; // 220:223
+ results[9] = new int[] { 9 }; // 245:248
+ results[10] = new int[] { 10 }; // 270:273
+ results[11] = new int[] { 11 }; // 295:298
+ results[12] = new int[] { 12 }; // 320:323
+ results[13] = new int[] { 13 }; // 345:348
+ results[14] = new int[] { 14 }; // 370:373
+ results[15] = new int[] { 15 }; // 395:398
+
+ RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+
+ executeFieldRangeIntersectPartitionTests(EACH_PARTITION, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 16, results,
+ 3, START_FIELD, END_FIELD);
+ }
+
+ @Test
+ public void testFRMPCF_Split_DESC_D3_N16_EACH() throws HyracksDataException {
+ int[][] results = new int[16][];
+ results[0] = new int[] { 15 }; // 20:23
+ results[1] = new int[] { 14 }; // 45:48
+ results[2] = new int[] { 13 }; // 70:73
+ results[3] = new int[] { 12 }; // 95:98
+ results[4] = new int[] { 11 }; // 120:123
+ results[5] = new int[] { 10 }; // 145:148
+ results[6] = new int[] { 9 }; // 170:173
+ results[7] = new int[] { 8 }; // 195:198
+ results[8] = new int[] { 7 }; // 220:223
+ results[9] = new int[] { 6 }; // 245:248
+ results[10] = new int[] { 5 }; // 270:273
+ results[11] = new int[] { 4 }; // 295:298
+ results[12] = new int[] { 3 }; // 320:323
+ results[13] = new int[] { 2 }; // 345:348
+ results[14] = new int[] { 1 }; // 370:373
+ results[15] = new int[] { 0 }; // 395:398
+
+ Long[] map = MAP_POINTS.clone();
+ ArrayUtils.reverse(map);
+ RangeMap rangeMap = getIntegerRangeMap(map);
+
+ executeFieldRangeIntersectPartitionTests(EACH_PARTITION, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 16,
+ results, 3, END_FIELD, START_FIELD);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactoryTest.java
new file mode 100644
index 0000000..3efe989
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactoryTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.Test;
+
+public class FieldRangePartitionComputerFactoryTest extends AbstractFieldRangeMultiPartitionComputerFactoryTest {
+
+ @Test
+ public void testFRMPCF_Project_ASC_D3_N4_EDGE() throws HyracksDataException {
+ int[][] results = new int[15][];
+ results[0] = new int[] { 0 }; // -25:-22
+ results[1] = new int[] { 0 }; // 50:53
+ results[2] = new int[] { 0 }; // 99:102
+ results[3] = new int[] { 1 }; // 100:103
+ results[4] = new int[] { 1 }; // 101:104
+ results[5] = new int[] { 1 }; // 150:153
+ results[6] = new int[] { 1 }; // 199:202
+ results[7] = new int[] { 2 }; // 200:203
+ results[8] = new int[] { 2 }; // 201:204
+ results[9] = new int[] { 2 }; // 250:253
+ results[10] = new int[] { 2 }; // 299:302
+ results[11] = new int[] { 3 }; // 300:303
+ results[12] = new int[] { 3 }; // 301:304
+ results[13] = new int[] { 3 }; // 350:353
+ results[14] = new int[] { 3 }; // 425:428
+
+ RangeMap rangeMap = getIntegerRangeMap(MAP_POINTS);
+ int[] rangeFields = new int[] { 0 };
+
+ executeFieldRangePartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_ASC_COMPARATOR_FACTORIES, 4, results, 3,
+ START_FIELD);
+ }
+
+ @Test
+ public void testFRMPCF_Project_DESC_D3_N4_EDGE() throws HyracksDataException {
+ int[][] results = new int[15][];
+ results[0] = new int[] { 3 }; // -25:-22
+ results[1] = new int[] { 3 }; // 50:53
+ results[2] = new int[] { 2 }; // 99:102
+ results[3] = new int[] { 2 }; // 100:103
+ results[4] = new int[] { 2 }; // 101:104
+ results[5] = new int[] { 2 }; // 150:153
+ results[6] = new int[] { 1 }; // 199:202
+ results[7] = new int[] { 1 }; // 200:203
+ results[8] = new int[] { 1 }; // 201:204
+ results[9] = new int[] { 1 }; // 250:253
+ results[10] = new int[] { 0 }; // 299:302
+ results[11] = new int[] { 0 }; // 300:303
+ results[12] = new int[] { 0 }; // 301:304
+ results[13] = new int[] { 0 }; // 350:353
+ results[14] = new int[] { 0 }; // 425:428
+
+ Long[] map = MAP_POINTS.clone();
+ ArrayUtils.reverse(map);
+ RangeMap rangeMap = getIntegerRangeMap(map);
+
+ executeFieldRangePartitionTests(PARTITION_EDGE_CASES, rangeMap, BINARY_DESC_COMPARATOR_FACTORIES, 4, results, 3,
+ END_FIELD);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/LongDescBinaryComparatorFactory.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/LongDescBinaryComparatorFactory.java
new file mode 100644
index 0000000..f72cdcd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/LongDescBinaryComparatorFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition.range;
+
+import static org.apache.hyracks.data.std.primitive.LongPointable.getLong;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.data.std.util.DataUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public final class LongDescBinaryComparatorFactory implements IBinaryComparatorFactory {
+
+ private static final long serialVersionUID = 1L;
+ public static final LongDescBinaryComparatorFactory INSTANCE = new LongDescBinaryComparatorFactory();
+
+ private LongDescBinaryComparatorFactory() {
+ }
+
+ @Override
+ public IBinaryComparator createBinaryComparator() {
+ return new IBinaryComparator() {
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ DataUtils.ensureLengths(8, l1, l2);
+ return -Long.compare(getLong(b1, s1), getLong(b2, s2));
+ }
+ };
+ }
+
+ @Override
+ public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
+ return registry.getClassIdentifier(getClass(), serialVersionUID);
+ }
+
+ @SuppressWarnings("squid:S1172") // unused parameter
+ public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+ return INSTANCE;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/pom.xml b/hyracks-fullstack/hyracks/hyracks-tests/pom.xml
index 02f050b..d55fa18 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-tests/pom.xml
@@ -51,5 +51,6 @@
<module>hyracks-storage-am-lsm-rtree-test</module>
<module>hyracks-storage-am-lsm-invertedindex-test</module>
<module>hyracks-storage-am-bloomfilter-test</module>
+ <module>hyracks-dataflow-common-test</module>
</modules>
</project>