[NO ISSUE][RT] Percentage List RangeMap
-user model changes: no
-storage formate changes: no
-interface changes: no
Details:
- Dynamic RangeMaps include a percentage list used for
partitioning.
Change-Id: Ie08773688111965ad820d8993fcb886a0aecdcb6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7903
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.23.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.23.query.sqlpp
index a08ad42..7c534a8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.23.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.23.query.sqlpp
@@ -27,4 +27,4 @@
SELECT four, ten div 4 as two,
sum(ten div 4) over (partition by four order by ten div 4 rows between unbounded preceding and current row) as `sum`,
last_value(ten div 4) over (partition by four order by ten div 4 rows between unbounded preceding and current row) as `last_value`
-ORDER BY four, ten div 4
+ORDER BY four, ten div 4, `sum`
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.74.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.74.query.sqlpp
index a251721..b5c7466 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.74.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.74.query.sqlpp
@@ -26,4 +26,4 @@
FROM tenk1
WHERE unique2 < 10
SELECT lag(ten) OVER (PARTITION BY four ORDER BY ten) AS lag, ten, four
-ORDER BY four, ten
+ORDER BY four, ten, lag
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.75.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.75.query.sqlpp
index d6d27ab..47f9ca4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.75.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.75.query.sqlpp
@@ -26,4 +26,4 @@
FROM tenk1 t
WHERE t.unique2 < 10
SELECT lag(t.ten) OVER (PARTITION BY t.four ORDER BY t.ten) AS lag, t.ten, t.four
-ORDER BY t.four, t.ten
+ORDER BY t.four, t.ten, lag
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.76.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.76.query.sqlpp
index ba9be1e..44683bd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.76.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.76.query.sqlpp
@@ -26,4 +26,4 @@
FROM tenk1
WHERE unique2 < 10
SELECT lag(ten, four) OVER (PARTITION BY four ORDER BY ten) AS lag, ten, four
-ORDER BY four, ten
\ No newline at end of file
+ORDER BY four, ten, lag
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.77.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.77.query.sqlpp
index 2081002..98e9b9e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.77.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.77.query.sqlpp
@@ -26,5 +26,5 @@
FROM tenk1
WHERE unique2 < 10
SELECT lag(ten, four, 0) OVER (PARTITION BY four ORDER BY ten) AS lag, ten, four
-ORDER BY four, ten
+ORDER BY four, ten, lag
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.78.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.78.query.sqlpp
index 518516b..1cd7a2d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.78.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.78.query.sqlpp
@@ -26,4 +26,4 @@
FROM tenk1
WHERE unique2 < 10
SELECT lead(ten) OVER (PARTITION BY four ORDER BY ten) AS lead, ten, four
-ORDER BY four, ten
\ No newline at end of file
+ORDER BY four, ten, lead
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.79.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.79.query.sqlpp
index 37ba499..b0c09b7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.79.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.79.query.sqlpp
@@ -26,4 +26,4 @@
FROM tenk1
WHERE unique2 < 10
SELECT lead(ten * 2, 1) OVER (PARTITION BY four ORDER BY ten) AS lead, ten, four
-ORDER BY four, ten
\ No newline at end of file
+ORDER BY four, ten, lead
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.80.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.80.query.sqlpp
index 226a854..509bcea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.80.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.80.query.sqlpp
@@ -26,4 +26,4 @@
FROM tenk1
WHERE unique2 < 10
SELECT lead(ten * 2, 1, -1) OVER (PARTITION BY four ORDER BY ten) AS lead, ten, four
-ORDER BY four, ten
\ No newline at end of file
+ORDER BY four, ten, lead
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.88.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.88.query.sqlpp
index 08c7446..3a7c8b2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.88.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.88.query.sqlpp
@@ -27,4 +27,4 @@
SELECT four, ten div 4 as two,
sum(ten div 4) over (partition by four order by ten div 4 rows between unbounded preceding and current row) AS `sum`,
last_value(ten div 4) over (partition by four order by ten div 4 rows between unbounded preceding and current row) AS `last_value`
-ORDER BY four, ten div 4
+ORDER BY four, ten div 4, `sum`
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java
index 2d749d2..da00cfd 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java
@@ -94,7 +94,7 @@
// TODO Add support for composite fields.
}
- return new RangeMap(1, abvs.getByteArray(), offsets);
+ return new RangeMap(1, abvs.getByteArray(), offsets, null);
}
@SuppressWarnings("unchecked")
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
index 2338841..2084e48 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
@@ -55,6 +55,7 @@
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.dataflow.common.data.marshalling.ByteArraySerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.DoubleArraySerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.IntArraySerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
@@ -145,6 +146,8 @@
private final Comparator<List<byte[]>> comparator;
private final int numOfPartitions;
private final int numOrderByFields;
+ private final int[] splitPoints;
+ private final double[] percentages;
@SuppressWarnings("unchecked")
private RangeMapFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context, boolean[] ascending,
@@ -155,6 +158,8 @@
this.comparator = createComparator(ascending, argsTypes);
this.numOfPartitions = numOfPartitions;
this.numOrderByFields = numOrderByFields;
+ this.splitPoints = new int[numOfPartitions - 1];
+ this.percentages = new double[numOfPartitions - 1];
}
@Override
@@ -214,26 +219,18 @@
}
} else {
finalSamples.sort(comparator);
- // divide the samples evenly and pick the boundaries as split points
- int nextSplitOffset = (int) Math.ceil(finalSamples.size() / (double) numOfPartitions);
- int nextSplitIndex = nextSplitOffset - 1;
- int endOffsetsCounter = 0;
- int numRequiredSplits = numOfPartitions - 1;
- endOffsets = new int[numRequiredSplits * numOrderByFields];
+ calculateSplitIndexes();
+ calculatePercentSplit();
+
List<byte[]> sample;
- for (int split = 1; split <= numRequiredSplits; split++) {
- // pick the split point from sorted samples (could be <3> or <4,"John"> if it's multi-column)
- sample = finalSamples.get(nextSplitIndex);
- for (int column = 0; column < sample.size(); column++) {
- allSplitValuesOut.write(sample.get(column));
+ int endOffsetsCounter = 0;
+ endOffsets = new int[splitPoints.length * numOrderByFields];
+ for (int i = 0; i < splitPoints.length; i++) {
+ sample = finalSamples.get(splitPoints[i]);
+ for (byte[] column : sample) {
+ allSplitValuesOut.write(column);
endOffsets[endOffsetsCounter++] = storage.getLength();
}
- // go to the next split point
- nextSplitIndex += nextSplitOffset;
- // in case we go beyond the boundary of samples, we pick the last sample repeatedly
- if (nextSplitIndex >= finalSamples.size()) {
- nextSplitIndex = finalSamples.size() - 1;
- }
}
}
} catch (IOException e) {
@@ -242,6 +239,54 @@
serializeRangeMap(numOrderByFields, storage.getByteArray(), endOffsets, result);
}
+ private void calculateSplitIndexes() {
+ int nextSplitOffset = (int) Math.ceil(finalSamples.size() / (double) numOfPartitions);
+ int nextSplitIndex = nextSplitOffset - 1;
+
+ for (int split = 0; split < splitPoints.length; split++) {
+ splitPoints[split] = nextSplitIndex;
+ nextSplitIndex += nextSplitOffset;
+ // in case we go beyond the boundary of samples, we pick the last sample repeatedly
+ if (nextSplitIndex >= finalSamples.size()) {
+ nextSplitIndex = finalSamples.size() - 1;
+ }
+ }
+ }
+
+ private void calculatePercentSplit() {
+ for (int i = 0; i < splitPoints.length; i++) {
+ List<byte[]> sampleAtSplit = finalSamples.get(splitPoints[i]);
+ int smallestIndexEqualToSample = splitPoints[i];
+ while (smallestIndexEqualToSample >= 0
+ && comparator.compare(sampleAtSplit, finalSamples.get(smallestIndexEqualToSample)) == 0) {
+ smallestIndexEqualToSample--;
+ }
+ smallestIndexEqualToSample++;
+
+ int largestSplitIncludingSample = i;
+ while (largestSplitIncludingSample < splitPoints.length && comparator.compare(sampleAtSplit,
+ finalSamples.get(splitPoints[largestSplitIncludingSample])) == 0) {
+ largestSplitIncludingSample++;
+ }
+ largestSplitIncludingSample--;
+
+ int largestIndexEqualToSample = splitPoints[largestSplitIncludingSample];
+ while (largestIndexEqualToSample < finalSamples.size()
+ && comparator.compare(sampleAtSplit, finalSamples.get(largestIndexEqualToSample)) == 0) {
+ largestIndexEqualToSample++;
+ }
+ largestIndexEqualToSample--;
+
+ double count = largestIndexEqualToSample - smallestIndexEqualToSample + 1;
+ double waterMark = smallestIndexEqualToSample - 1;
+ for (int j = i; j <= largestSplitIncludingSample; j++) {
+ percentages[j] = ((splitPoints[j] - waterMark) * 100) / (count);
+ waterMark = splitPoints[j];
+ }
+ i = largestSplitIncludingSample;
+ }
+ }
+
@Override
public void finishPartial(IPointable result) throws HyracksDataException {
finish(result);
@@ -297,6 +342,7 @@
IntegerSerializerDeserializer.write(numberFields, serRangeMap.getDataOutput());
ByteArraySerializerDeserializer.write(splitValues, serRangeMap.getDataOutput());
IntArraySerializerDeserializer.write(endOffsets, serRangeMap.getDataOutput());
+ DoubleArraySerializerDeserializer.write(percentages, serRangeMap.getDataOutput());
binary.setValue(serRangeMap.getByteArray(), serRangeMap.getStartOffset(), serRangeMap.getLength());
storage.reset();
binarySerde.serialize(binary, storage.getDataOutput());
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index 9c80f4c..5d943c8b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -78,8 +78,9 @@
public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
Pair<int[], IBinaryComparatorFactory[]> pOrderColumns = createOrderColumnsAndComparators(op, opSchema, context);
- FieldRangePartitionComputerFactory partitionerFactory = new FieldRangePartitionComputerFactory(
- pOrderColumns.first, pOrderColumns.second, crateRangeMapSupplier(), op.getSourceLocation());
+ FieldRangePartitionComputerFactory partitionerFactory =
+ new FieldRangePartitionComputerFactory(pOrderColumns.first, pOrderColumns.second,
+ crateRangeMapSupplier(), op.getSourceLocation(), rangeMapIsComputedAtRunTime);
IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, partitionerFactory);
return new Pair<>(conn, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/DoubleArraySerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/DoubleArraySerializerDeserializer.java
new file mode 100644
index 0000000..b8a2b1b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/DoubleArraySerializerDeserializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.marshalling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DoubleArraySerializerDeserializer implements ISerializerDeserializer<double[]> {
+ private static final long serialVersionUID = 1L;
+
+ public static final DoubleArraySerializerDeserializer INSTANCE = new DoubleArraySerializerDeserializer();
+
+ private DoubleArraySerializerDeserializer() {
+ }
+
+ @Override
+ public double[] deserialize(DataInput in) throws HyracksDataException {
+ return read(in);
+ }
+
+ @Override
+ public void serialize(double[] instance, DataOutput out) throws HyracksDataException {
+ write(instance, out);
+ }
+
+ public static double[] read(DataInput in) throws HyracksDataException {
+ try {
+ int len = in.readInt();
+ double[] array = new double[len];
+ for (int i = 0; i < array.length; ++i) {
+ array[i] = in.readDouble();
+ }
+ return array;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static void write(double[] instance, DataOutput out) throws HyracksDataException {
+ try {
+ out.writeInt(instance.length);
+ for (int i = 0; i < instance.length; ++i) {
+ out.writeDouble(instance[i]);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
index cefd38a..f176469 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
@@ -21,6 +21,8 @@
import java.io.Serializable;
import java.util.BitSet;
+import java.util.Random;
+import java.util.function.Supplier;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -61,11 +63,10 @@
final IHyracksTaskContext taskContext;
- final RangeMapPartitionComputer rangeMapPartitionComputer;
+ RangeMapPartitionComputer rangeMapPartitionComputer;
private AbstractFieldRangePartitionComputer(IHyracksTaskContext taskContext) {
this.taskContext = taskContext;
- this.rangeMapPartitionComputer = new RangeMapPartitionComputer();
}
public void initialize() throws HyracksDataException {
@@ -76,8 +77,10 @@
abstract class AbstractFieldRangeSinglePartitionComputer extends AbstractFieldRangePartitionComputer
implements ITuplePartitionComputer {
- AbstractFieldRangeSinglePartitionComputer(IHyracksTaskContext taskContext) {
+ AbstractFieldRangeSinglePartitionComputer(IHyracksTaskContext taskContext,
+ Supplier<RangeMapPartitionComputer> supplier) {
super(taskContext);
+ this.rangeMapPartitionComputer = supplier.get();
}
@Override
@@ -96,6 +99,7 @@
AbstractFieldRangeMultiPartitionComputer(IHyracksTaskContext taskContext) {
super(taskContext);
+ this.rangeMapPartitionComputer = new RangeMapPartitionComputer();
}
@Override
@@ -127,11 +131,11 @@
throws HyracksDataException;
}
- final class RangeMapPartitionComputer {
+ class RangeMapPartitionComputer {
- private RangeMap rangeMap;
+ protected RangeMap rangeMap;
- private IBinaryComparator[] comparators;
+ protected IBinaryComparator[] comparators;
protected void initialize(IHyracksTaskContext taskContext) throws HyracksDataException {
rangeMap = rangeMapSupplier.getRangeMap(taskContext);
@@ -164,7 +168,7 @@
return (int) Math.floor(slotIndex / rangesPerPart);
}
- private int findRangeMapSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
+ protected int findRangeMapSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
throws HyracksDataException {
int slotIndex = 0;
for (int slotNumber = 0, n = rangeMap.getSplitCount(); slotNumber < n; ++slotNumber) {
@@ -190,7 +194,7 @@
return slotIndex;
}
- private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int slotNumber)
+ protected int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int slotNumber)
throws HyracksDataException {
int c = 0;
int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -209,4 +213,51 @@
return c;
}
}
+
+ final class PercentageRangeMapPartitionComputer extends RangeMapPartitionComputer {
+ private final Random r = new Random();
+
+ @Override
+ protected int findRangeMapSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
+ throws HyracksDataException {
+ int slotIndex = 0;
+ for (int slotNumber = 0; slotNumber < rangeMap.getSplitCount(); ++slotNumber) {
+ int c = compareSlotAndFields(accessor, tIndex, rangeFields, slotNumber);
+ if (c == 0) {
+ double percent = 100 * r.nextDouble();
+ int slotIterator = slotNumber;
+
+ while (slotIterator < rangeMap.getSplitCount()
+ && compareSplittingVector(slotIterator, slotNumber) == 0) {
+ percent -= rangeMap.getPercentages()[slotIterator];
+ if (percent <= 0) {
+ break;
+ }
+ slotIterator++;
+ }
+ return slotIterator;
+ }
+ if (c < 0) {
+ return slotIndex;
+ }
+ slotIndex++;
+ }
+ return slotIndex;
+ }
+
+ private int compareSplittingVector(int slotNumber1, int slotNumber2) throws HyracksDataException {
+ int c = 0;
+ for (int fieldNum = 0; fieldNum < comparators.length; ++fieldNum) {
+ c = comparators[fieldNum].compare(rangeMap.getByteArray(),
+ rangeMap.getStartOffset(fieldNum, slotNumber1), rangeMap.getLength(fieldNum, slotNumber1),
+ rangeMap.getByteArray(), rangeMap.getStartOffset(fieldNum, slotNumber2),
+ rangeMap.getLength(fieldNum, slotNumber2));
+ if (c != 0) {
+ return c;
+ }
+ }
+ return c;
+ }
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index 0816f2d..e8e8e14 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -29,19 +29,23 @@
public final class FieldRangePartitionComputerFactory extends AbstractFieldRangePartitionComputerFactory
implements ITuplePartitionComputerFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final int[] rangeFields;
+ private final boolean usePercentage;
+
public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
- RangeMapSupplier rangeMapSupplier, SourceLocation sourceLocation) {
+ RangeMapSupplier rangeMapSupplier, SourceLocation sourceLocation, boolean usePercentage) {
super(rangeMapSupplier, comparatorFactories, sourceLocation);
this.rangeFields = rangeFields;
+ this.usePercentage = usePercentage;
}
@Override
public ITuplePartitionComputer createPartitioner(IHyracksTaskContext taskContext) {
- return new AbstractFieldRangeSinglePartitionComputer(taskContext) {
+ return new AbstractFieldRangeSinglePartitionComputer(taskContext,
+ usePercentage ? PercentageRangeMapPartitionComputer::new : RangeMapPartitionComputer::new) {
@Override
protected int computePartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
index aaacceb..146de3b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -35,16 +35,18 @@
* </pre>
*/
public class RangeMap implements Serializable {
- private static final long serialVersionUID = -7523433293419648234L;
+ private static final long serialVersionUID = 1L;
private final int fields;
private final byte[] bytes;
private final int[] endOffsets;
+ private final double[] percentages;
- public RangeMap(int numFields, byte[] bytes, int[] endOffsets) {
+ public RangeMap(int numFields, byte[] bytes, int[] endOffsets, double[] percentages) {
this.fields = numFields;
this.bytes = bytes;
this.endOffsets = endOffsets;
+ this.percentages = percentages;
}
public int getSplitCount() {
@@ -55,6 +57,10 @@
return bytes;
}
+ public double[] getPercentages() {
+ return percentages;
+ }
+
public int getTag(int fieldIndex, int splitIndex) {
return getSplitValueTag(getSplitValueIndex(fieldIndex, splitIndex));
}
@@ -111,7 +117,7 @@
@Override
public int hashCode() {
- return fields + Arrays.hashCode(bytes) + Arrays.hashCode(endOffsets);
+ return fields + Arrays.hashCode(bytes) + Arrays.hashCode(endOffsets) + Arrays.hashCode(percentages);
}
@Override
@@ -124,11 +130,12 @@
}
RangeMap other = (RangeMap) object;
return fields == other.fields && Arrays.equals(endOffsets, other.endOffsets)
- && Arrays.equals(bytes, other.bytes);
+ && Arrays.equals(bytes, other.bytes) && Arrays.equals(percentages, other.percentages);
}
@Override
public String toString() {
return "{SPLIT:" + getSplitCount() + '}';
}
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
index 1daf9fb..b28b2d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.marshalling.ByteArraySerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.DoubleArraySerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.IntArraySerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
@@ -130,6 +131,7 @@
private int numFields;
private byte[] splitValues;
private int[] splitValuesEndOffsets;
+ private double[] percentages;
private RangeMapReaderActivityNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecordDescriptor,
ActivityId activityId, int partition) {
@@ -138,6 +140,7 @@
this.frameTupleReference = new FrameTupleReference();
this.activityId = activityId;
this.partition = partition;
+ this.numFields = -1;
}
@Override
@@ -165,6 +168,7 @@
numFields = IntegerSerializerDeserializer.read(dataInputStream);
splitValues = ByteArraySerializerDeserializer.read(dataInputStream);
splitValuesEndOffsets = IntArraySerializerDeserializer.read(dataInputStream);
+ percentages = DoubleArraySerializerDeserializer.read(dataInputStream);
}
@Override
@@ -181,7 +185,7 @@
// store the range map in the state object of ctx so that next activity (forward) could retrieve it
TaskId rangeMapReaderTaskId = new TaskId(activityId, partition);
RangeMapState rangeMapState = new RangeMapState(ctx.getJobletContext().getJobId(), rangeMapReaderTaskId);
- rangeMapState.rangeMap = new RangeMap(numFields, splitValues, splitValuesEndOffsets);
+ rangeMapState.rangeMap = new RangeMap(numFields, splitValues, splitValuesEndOffsets, percentages);
ctx.setStateObject(rangeMapState);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
index a61cd40..668e20f 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
@@ -136,7 +136,7 @@
for (int i = 0; i < integers.length; ++i) {
offsets[i] = (i + 1) * INTEGER_LENGTH;
}
- return new RangeMap(1, getIntegerBytes(integers), offsets);
+ return new RangeMap(1, getIntegerBytes(integers), offsets, null);
}
private ByteBuffer prepareData(IHyracksTaskContext ctx, Long[] startPoints, Long duration)
@@ -194,7 +194,7 @@
SourceLocation sourceLocation = new SourceLocation(0, 0);
ITuplePartitionComputerFactory itpcf = new FieldRangePartitionComputerFactory(rangeFields,
- minComparatorFactories, rangeMapSupplier, sourceLocation);
+ minComparatorFactories, rangeMapSupplier, sourceLocation, false);
executeFieldRangePartitionTests(integers, itpcf, nParts, results, duration);