Fix for issue 589.
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 0211d6c..ad363f3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -16,7 +16,6 @@
package edu.uci.ics.asterix.common.context;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -34,21 +33,22 @@
private final IVirtualBufferCache datasetBufferCache;
private final int datasetID;
// Number of active operations on a ILSMIndex instance.
- private AtomicInteger numActiveOperations;
+ private int numActiveOperations;
public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
super(ioOpCallbackFactory);
this.datasetLifecycleManager = datasetLifecycleManager;
- this.numActiveOperations = new AtomicInteger(0);
+ this.numActiveOperations = 0;
this.datasetID = datasetID;
datasetBufferCache = datasetLifecycleManager.getVirtualBufferCache(datasetID);
}
@Override
- public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
- IModificationOperationCallback modificationCallback) throws HyracksDataException {
- numActiveOperations.incrementAndGet();
+ public synchronized void beforeOperation(ILSMIndex index, LSMOperationType opType,
+ ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
+ throws HyracksDataException {
+ numActiveOperations++;
}
@Override
@@ -64,8 +64,10 @@
@Override
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- int nActiveOps = numActiveOperations.decrementAndGet();
-
+ int nActiveOps;
+ synchronized (this) {
+ nActiveOps = numActiveOperations--;
+ }
if (opType != LSMOperationType.FLUSH) {
flushIfFull(nActiveOps);
}
@@ -74,6 +76,11 @@
private void flushIfFull(int nActiveOps) throws HyracksDataException {
// If we need a flush, and this is the last completing operation, then schedule the flush.
if (datasetBufferCache.isFull() && nActiveOps == 0) {
+ synchronized (this) {
+ if (numActiveOperations > 0) {
+ return;
+ }
+ }
Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
for (ILSMIndex lsmIndex : indexes) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
@@ -85,11 +92,13 @@
}
public void exclusiveJobCommitted() throws HyracksDataException {
- numActiveOperations.set(0);
+ synchronized (this) {
+ numActiveOperations = 0;
+ }
flushIfFull(0);
}
- public int getNumActiveOperations() {
- return numActiveOperations.get();
+ public synchronized int getNumActiveOperations() {
+ return numActiveOperations;
}
}