Fixed a race in the datagen thread, reported by Sattam.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1169 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
index db753e3..2f62231 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
@@ -15,7 +15,7 @@
     public final BlockingQueue<TupleBatch> tupleBatchQueue;
     private final int maxNumBatches;
     private final int maxOutstandingBatches;        
-    private int numBatches;
+    private int numBatches = 0;
     private final Random rnd;
     
     // maxOutstandingBatches pre-created tuple-batches for populating the queue.
@@ -31,29 +31,40 @@
         for (int i = 0; i < maxOutstandingBatches; i++) {
             tupleBatches[i] = new TupleBatch(batchSize, fieldGens, fieldSerdes, payloadSize);
         }
-        // make sure we don't overwrite tuples that are in use by consumers. 
-        // -1 because we first generate a new tuple, and then try to put it into the queue.
-        int capacity = Math.max(maxOutstandingBatches - numConsumers - 1, 1);
-        tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(capacity);
+        tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
         ringPos = 0;
     }
     
     @Override
     public void run() {
         while(numBatches < maxNumBatches) {
+            boolean added = false;
             try {
-                tupleBatches[ringPos].generate();
-                tupleBatchQueue.put(tupleBatches[ringPos]);
+                if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {                    
+                    tupleBatches[ringPos].generate();
+                    tupleBatchQueue.put(tupleBatches[ringPos]);
+                    added = true;
+                }
             } catch (IOException e) {
                 e.printStackTrace();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
-            numBatches++;
-            ringPos++;
-            if (ringPos >= maxOutstandingBatches) {
-                ringPos = 0;
+            if (added) {
+                numBatches++;
+                ringPos++;
+                if (ringPos >= maxOutstandingBatches) {
+                    ringPos = 0;
+                }
             }
         }
     }
+    
+    public TupleBatch getBatch() throws InterruptedException {
+        return tupleBatchQueue.take();
+    }
+    
+    public void releaseBatch(TupleBatch batch) {
+        batch.inUse.set(false);
+    }
 }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/TupleBatch.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/TupleBatch.java
index 4c202c0..c500c94 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/TupleBatch.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/TupleBatch.java
@@ -1,6 +1,7 @@
 package edu.uci.ics.hyracks.storage.am.common.datagen;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -8,6 +9,7 @@
 public class TupleBatch {
     private final int size;
     private final TupleGenerator[] tupleGens;
+    public final AtomicBoolean inUse = new AtomicBoolean(false);
     
     public TupleBatch(int size, IFieldValueGenerator[] fieldGens, ISerializerDeserializer[] fieldSerdes, int payloadSize) {        
         this.size = size;
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/AbstractTreeIndexTestWorker.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/AbstractTreeIndexTestWorker.java
index 4e853f3..a98efde 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/AbstractTreeIndexTestWorker.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/AbstractTreeIndexTestWorker.java
@@ -43,12 +43,13 @@
     public void run() {
         try {
             for (int i = 0; i < numBatches; i++) {
-                TupleBatch batch = dataGen.tupleBatchQueue.take();
+                TupleBatch batch = dataGen.getBatch();     
                 for (int j = 0; j < batch.size(); j++) {
                     TestOperation op = opSelector.getOp(rnd.nextInt());
                     ITupleReference tuple = batch.get(j);
                     performOp(tuple, op);
                 }
+                dataGen.releaseBatch(batch);
             }
         } catch (Exception e) {
             e.printStackTrace();
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java
index 4395c01..bc3350b 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java
@@ -25,7 +25,6 @@
 @SuppressWarnings("rawtypes")
 public class TreeIndexMultiThreadTestDriver {
     private static final int RANDOM_SEED = 50;
-    private static final int MAX_OUTSTANDING_BATCHES = 10;
     // Means no additional payload. Only the specified fields.
     private static final int PAYLOAD_SIZE = 0;
     private final TestOperationSelector opSelector;    
@@ -84,6 +83,6 @@
     
     // To allow subclasses to override the data gen params.
     public DataGenThread createDatagenThread(int numThreads, int numBatches, int batchSize) {
-        return new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, PAYLOAD_SIZE, RANDOM_SEED, MAX_OUTSTANDING_BATCHES, false);
+        return new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, PAYLOAD_SIZE, RANDOM_SEED, 2*numThreads, false);
     }
 }