Fixed a race in the datagen thread, reported by Sattam.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1169 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
index db753e3..2f62231 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
@@ -15,7 +15,7 @@
public final BlockingQueue<TupleBatch> tupleBatchQueue;
private final int maxNumBatches;
private final int maxOutstandingBatches;
- private int numBatches;
+ private int numBatches = 0;
private final Random rnd;
// maxOutstandingBatches pre-created tuple-batches for populating the queue.
@@ -31,29 +31,40 @@
for (int i = 0; i < maxOutstandingBatches; i++) {
tupleBatches[i] = new TupleBatch(batchSize, fieldGens, fieldSerdes, payloadSize);
}
- // make sure we don't overwrite tuples that are in use by consumers.
- // -1 because we first generate a new tuple, and then try to put it into the queue.
- int capacity = Math.max(maxOutstandingBatches - numConsumers - 1, 1);
- tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(capacity);
+ tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
ringPos = 0;
}
@Override
public void run() {
while(numBatches < maxNumBatches) {
+ boolean added = false;
try {
- tupleBatches[ringPos].generate();
- tupleBatchQueue.put(tupleBatches[ringPos]);
+ if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {
+ tupleBatches[ringPos].generate();
+ tupleBatchQueue.put(tupleBatches[ringPos]);
+ added = true;
+ }
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
- numBatches++;
- ringPos++;
- if (ringPos >= maxOutstandingBatches) {
- ringPos = 0;
+ if (added) {
+ numBatches++;
+ ringPos++;
+ if (ringPos >= maxOutstandingBatches) {
+ ringPos = 0;
+ }
}
}
}
+
+ public TupleBatch getBatch() throws InterruptedException {
+ return tupleBatchQueue.take();
+ }
+
+ public void releaseBatch(TupleBatch batch) {
+ batch.inUse.set(false);
+ }
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/TupleBatch.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/TupleBatch.java
index 4c202c0..c500c94 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/TupleBatch.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/TupleBatch.java
@@ -1,6 +1,7 @@
package edu.uci.ics.hyracks.storage.am.common.datagen;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -8,6 +9,7 @@
public class TupleBatch {
private final int size;
private final TupleGenerator[] tupleGens;
+ public final AtomicBoolean inUse = new AtomicBoolean(false);
public TupleBatch(int size, IFieldValueGenerator[] fieldGens, ISerializerDeserializer[] fieldSerdes, int payloadSize) {
this.size = size;
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/AbstractTreeIndexTestWorker.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/AbstractTreeIndexTestWorker.java
index 4e853f3..a98efde 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/AbstractTreeIndexTestWorker.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/AbstractTreeIndexTestWorker.java
@@ -43,12 +43,13 @@
public void run() {
try {
for (int i = 0; i < numBatches; i++) {
- TupleBatch batch = dataGen.tupleBatchQueue.take();
+ TupleBatch batch = dataGen.getBatch();
for (int j = 0; j < batch.size(); j++) {
TestOperation op = opSelector.getOp(rnd.nextInt());
ITupleReference tuple = batch.get(j);
performOp(tuple, op);
}
+ dataGen.releaseBatch(batch);
}
} catch (Exception e) {
e.printStackTrace();
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java
index 4395c01..bc3350b 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/test/TreeIndexMultiThreadTestDriver.java
@@ -25,7 +25,6 @@
@SuppressWarnings("rawtypes")
public class TreeIndexMultiThreadTestDriver {
private static final int RANDOM_SEED = 50;
- private static final int MAX_OUTSTANDING_BATCHES = 10;
// Means no additional payload. Only the specified fields.
private static final int PAYLOAD_SIZE = 0;
private final TestOperationSelector opSelector;
@@ -84,6 +83,6 @@
// To allow subclasses to override the data gen params.
public DataGenThread createDatagenThread(int numThreads, int numBatches, int batchSize) {
- return new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, PAYLOAD_SIZE, RANDOM_SEED, MAX_OUTSTANDING_BATCHES, false);
+ return new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, PAYLOAD_SIZE, RANDOM_SEED, 2*numThreads, false);
}
}