[NO ISSUE][RT] Randomized Sampling

-user model changes: no
-storage formate changes: no
-interface changes: no

Details:
 - LocalSamplingAggregateDescriptor now takes
   random samples instead of first 100

Change-Id: I8889f49b127773d90c6ef8a9f09d8993a4da68ac
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4223
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
index 5e0f0d9..51431d3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/LocalSamplingAggregateDescriptor.java
@@ -19,6 +19,8 @@
 package org.apache.asterix.runtime.aggregates.std;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
 
 import org.apache.asterix.common.config.CompilerProperties;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -42,7 +44,6 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -97,7 +98,9 @@
         private final IPointable inputFieldValue = new VoidPointable();
         private final int numSamplesRequired;
         private final IScalarEvaluator[] sampledFieldsEval;
-        private int numSamples;
+        private final ArrayList<ArrayBackedValueStorage> samples = new ArrayList<>();
+        private final Random random = new Random();
+        private int count = 0;
 
         /**
          * @param args the fields that constitute a sample, e.g., $$1, $$2
@@ -119,10 +122,9 @@
 
         @Override
         public void init() throws HyracksDataException {
-            numSamples = 0;
+            samples.clear();
+            count = 0;
             rangeMapBits.reset();
-            // write a dummy integer at the beginning to be filled later with the actual number of samples taken
-            IntegerSerializerDeserializer.write(0, rangeMapBits.getDataOutput());
         }
 
         /**
@@ -132,15 +134,27 @@
          */
         @Override
         public void step(IFrameTupleReference tuple) throws HyracksDataException {
-            if (numSamples >= numSamplesRequired) {
+            count++;
+            if (samples.size() < numSamplesRequired) {
+                ArrayBackedValueStorage sampleStorage = new ArrayBackedValueStorage();
+                writeTupleKey(tuple, sampleStorage);
+                samples.add(sampleStorage);
                 return;
             }
-            for (int i = 0; i < sampledFieldsEval.length; i++) {
-                sampledFieldsEval[i].evaluate(tuple, inputFieldValue);
-                IntegerSerializerDeserializer.write(inputFieldValue.getLength(), rangeMapBits.getDataOutput());
-                rangeMapBits.append(inputFieldValue);
+            int swap = random.nextInt(count);
+            if (swap < numSamplesRequired) {
+                writeTupleKey(tuple, samples.get(swap));
             }
-            numSamples++;
+        }
+
+        private void writeTupleKey(IFrameTupleReference tuple, ArrayBackedValueStorage storage)
+                throws HyracksDataException {
+            storage.reset();
+            for (IScalarEvaluator iScalarEvaluator : sampledFieldsEval) {
+                iScalarEvaluator.evaluate(tuple, inputFieldValue);
+                IntegerSerializerDeserializer.write(inputFieldValue.getLength(), storage.getDataOutput());
+                storage.append(inputFieldValue);
+            }
         }
 
         /**
@@ -151,7 +165,7 @@
         @Override
         public void finish(IPointable result) throws HyracksDataException {
             storage.reset();
-            if (numSamples == 0) {
+            if (samples.size() == 0) {
                 // empty partition? then send system null as an indication of empty partition.
                 try {
                     storage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
@@ -160,7 +174,11 @@
                     throw HyracksDataException.create(e);
                 }
             } else {
-                IntegerPointable.setInteger(rangeMapBits.getByteArray(), rangeMapBits.getStartOffset(), numSamples);
+                rangeMapBits.reset();
+                IntegerSerializerDeserializer.write(samples.size(), rangeMapBits.getDataOutput());
+                for (ArrayBackedValueStorage sample : samples) {
+                    rangeMapBits.append(sample);
+                }
                 binary.setValue(rangeMapBits.getByteArray(), rangeMapBits.getStartOffset(), rangeMapBits.getLength());
                 binarySerde.serialize(binary, storage.getDataOutput());
                 result.set(storage);