Improve KV Generator

Currently, KV Generator produces the same documents when
running in parallel where the only difference is in the
key of the record. With this change, it produces different
Documents in each partition to produce more diverse set of
records.

Change-Id: Ia337be57f8394d74a98f3c72ebcaa584f277f34b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/724
Reviewed-by: Michael Blow <michael.blow@couchbase.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
index 9e797e3..f540f4e 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -45,6 +45,7 @@
     private final ArrayList<Short> assigned;
     private final int numberOfMutations;
     private int counter = 0;
+    private int upsertCounter = 0;
     private boolean stopped = false;
     // for deterministic data generation
     private int expiration = 7999;
@@ -59,14 +60,13 @@
     private short nextUpsertPartition;
     private final ByteBuf byteBuff;
     private final StringBuilder strBuilder = new StringBuilder();
-    private int upsertCounter = 0;
     private final String[] names = { "Michael Carey", "Till Westmann", "Michael Blow", "Chris Hillary", "Yingyi Bu",
             "Ian Maxon", "Abdullah Alamoudi" };
 
-    public KVTestReader(final int partition, final String bucket, final int[] schedule,
-            final int numberOfMutations, final int deleteCycle, final int upsertCycle) {
+    public KVTestReader(final int partition, final String bucket, final int[] schedule, final int numberOfMutations,
+            final int deleteCycle, final int upsertCycle, int counterStart) {
         this.bucket = bucket;
-        this.numberOfMutations = numberOfMutations;
+        this.numberOfMutations = numberOfMutations + counterStart;
         this.assigned = new ArrayList<>();
         this.deleteCycle = deleteCycle;
         this.upsertCycle = upsertCycle;
@@ -82,6 +82,7 @@
         this.byteBuff = ByteBufAllocator.DEFAULT.buffer(ExternalDataConstants.DEFAULT_BUFFER_SIZE);
         byteBuff.retain();
         this.record = new GenericRecord<DCPRequest>();
+        this.counter = counterStart;
     }
 
     private String generateKey() {
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index 8242554..adaa525 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -37,11 +37,13 @@
     private int numOfRecords = 1000; // default = 1 Million
     private int deleteCycle = 0;
     private int upsertCycle = 0;
+    private int numOfReaders;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
     @Override
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
         clusterLocations = AsterixClusterProperties.INSTANCE.getClusterLocations();
+        numOfReaders = clusterLocations.getLocations().length;
         return clusterLocations;
     }
 
@@ -68,7 +70,7 @@
     public IRecordReader<? extends DCPRequest> createRecordReader(final IHyracksTaskContext ctx, final int partition) {
         return new KVTestReader(partition, bucket, schedule,
                 (int) Math.ceil((double) numOfRecords / (double) getPartitionConstraint().getLocations().length),
-                deleteCycle, upsertCycle);
+                deleteCycle, upsertCycle, (numOfRecords / numOfReaders) * partition);
     }
 
     @Override
diff --git a/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java b/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
index 14b6ed1..b08fc7d 100644
--- a/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
+++ b/asterix-external-data/src/test/java/org/apache/asterix/external/generator/test/DCPGeneratorTest.java
@@ -32,7 +32,7 @@
     @Test
     public void runTest() throws Exception {
         try (KVTestReader cbreader = new KVTestReader(0, "TestBucket",
-                new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }, 150, 0, 0)) {
+                new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }, 150, 0, 0, 0)) {
             final UTF8StringPointable pointable = new UTF8StringPointable();
             final DCPRequestToRecordWithMetadataAndPKConverter converter = new DCPRequestToRecordWithMetadataAndPKConverter();
             while (cbreader.hasNext()) {