[NO ISSUE][RT] Randomized Sampling
-user model changes: no
-storage formate changes: no
-interface changes: no
Details:
- LocalSamplingAggregateDescriptor now takes
random samples instead of first 100
Change-Id: I8889f49b127773d90c6ef8a9f09d8993a4da68ac
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4223
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
index 5e0f0d9..51431d3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
@@ -19,6 +19,8 @@
package org.apache.asterix.runtime.aggregates.std;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -42,7 +44,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -97,7 +98,9 @@
private final IPointable inputFieldValue = new VoidPointable();
private final int numSamplesRequired;
private final IScalarEvaluator[] sampledFieldsEval;
- private int numSamples;
+ private final ArrayList<ArrayBackedValueStorage> samples = new ArrayList<>();
+ private final Random random = new Random();
+ private int count = 0;
/**
* @param args the fields that constitute a sample, e.g., $$1, $$2
@@ -119,10 +122,9 @@
@Override
public void init() throws HyracksDataException {
- numSamples = 0;
+ samples.clear();
+ count = 0;
rangeMapBits.reset();
- // write a dummy integer at the beginning to be filled later with the actual number of samples taken
- IntegerSerializerDeserializer.write(0, rangeMapBits.getDataOutput());
}
/**
@@ -132,15 +134,27 @@
*/
@Override
public void step(IFrameTupleReference tuple) throws HyracksDataException {
- if (numSamples >= numSamplesRequired) {
+ count++;
+ if (samples.size() < numSamplesRequired) {
+ ArrayBackedValueStorage sampleStorage = new ArrayBackedValueStorage();
+ writeTupleKey(tuple, sampleStorage);
+ samples.add(sampleStorage);
return;
}
- for (int i = 0; i < sampledFieldsEval.length; i++) {
- sampledFieldsEval[i].evaluate(tuple, inputFieldValue);
- IntegerSerializerDeserializer.write(inputFieldValue.getLength(), rangeMapBits.getDataOutput());
- rangeMapBits.append(inputFieldValue);
+ int swap = random.nextInt(count);
+ if (swap < numSamplesRequired) {
+ writeTupleKey(tuple, samples.get(swap));
}
- numSamples++;
+ }
+
+ private void writeTupleKey(IFrameTupleReference tuple, ArrayBackedValueStorage storage)
+ throws HyracksDataException {
+ storage.reset();
+ for (IScalarEvaluator iScalarEvaluator : sampledFieldsEval) {
+ iScalarEvaluator.evaluate(tuple, inputFieldValue);
+ IntegerSerializerDeserializer.write(inputFieldValue.getLength(), storage.getDataOutput());
+ storage.append(inputFieldValue);
+ }
}
/**
@@ -151,7 +165,7 @@
@Override
public void finish(IPointable result) throws HyracksDataException {
storage.reset();
- if (numSamples == 0) {
+ if (samples.size() == 0) {
// empty partition? then send system null as an indication of empty partition.
try {
storage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
@@ -160,7 +174,11 @@
throw HyracksDataException.create(e);
}
} else {
- IntegerPointable.setInteger(rangeMapBits.getByteArray(), rangeMapBits.getStartOffset(), numSamples);
+ rangeMapBits.reset();
+ IntegerSerializerDeserializer.write(samples.size(), rangeMapBits.getDataOutput());
+ for (ArrayBackedValueStorage sample : samples) {
+ rangeMapBits.append(sample);
+ }
binary.setValue(rangeMapBits.getByteArray(), rangeMapBits.getStartOffset(), rangeMapBits.getLength());
binarySerde.serialize(binary, storage.getDataOutput());
result.set(storage);