Modified Asterix operation tracker according to recent change in its interface in Hyracks.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@821 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
index b2391ae..6713492 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
@@ -33,20 +33,24 @@
private final ILSMIndex index;
private final ILSMIndexAccessor accessor;
private final ILSMIOOperationCallback ioOpCallback;
-
+
public IndexOperationTracker(ILSMIndex index, ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
this.index = index;
accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ioOpCallback = ioOpCallbackFactory.createIOOperationCallback(this);
- }
+ }
@Override
- public synchronized void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ public synchronized boolean beforeOperation(ILSMIndexOperationContext opCtx, boolean tryOperation)
+ throws HyracksDataException {
// Wait for pending flushes to complete.
// If flushFlag is set, then the flush is queued to occur by the last completing operation.
// This operation should wait for that flush to occur before proceeding.
if (index.getFlushController().getFlushStatus(index)) {
+ if (tryOperation) {
+ return false;
+ }
try {
this.wait();
} catch (InterruptedException e) {
@@ -54,10 +58,12 @@
}
}
numActiveOperations++;
-
+
// Increment transactor-local active operations count.
AbstractOperationCallback opCallback = getOperationCallback(opCtx);
opCallback.incrementLocalNumActiveOperations();
+
+ return true;
}
@Override
@@ -76,14 +82,14 @@
// Decrement transactor-local active operations count.
AbstractOperationCallback opCallback = getOperationCallback(opCtx);
opCallback.decrementLocalNumActiveOperations();
-
+
// If we need a flush, and this is the last completing operation, then schedule the flush.
// Once the flush has completed notify all waiting operations.
if (index.getFlushController().getFlushStatus(index) && numActiveOperations == 0) {
index.getIOScheduler().scheduleOperation(accessor.createFlushOperation(ioOpCallback));
}
}
-
+
private AbstractOperationCallback getOperationCallback(ILSMIndexOperationContext opCtx) {
IndexOperation op = opCtx.getOperation();
if (op == IndexOperation.SEARCH || op == IndexOperation.DISKORDERSCAN) {
@@ -92,11 +98,11 @@
return (AbstractOperationCallback) opCtx.getModificationCallback();
}
}
-
+
public long getLastLSN() {
return lastLsn;
}
-
+
public void setLastLSN(long lastLsn) {
this.lastLsn = lastLsn;
}