Added 'try' versions of LSM index operations to allow their caller to avoid potential deadlocks involving LSM flushes.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2413 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 6251d69..de4e627 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -92,6 +92,5 @@
@Override
public void fail() throws HyracksDataException {
- writer.fail();
}
}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 1c70af5..9c08645 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -25,8 +25,8 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMHarness {
- public void insertUpdateOrDelete(ITupleReference tuple, ILSMIndexOperationContext ictx) throws HyracksDataException,
- IndexException;
+ public boolean insertUpdateOrDelete(ITupleReference tuple, ILSMIndexOperationContext ictx, boolean tryOperation)
+ throws HyracksDataException, IndexException;
public List<Object> search(IIndexCursor cursor, ISearchPredicate pred, ILSMIndexOperationContext ctx,
boolean includeMemComponent) throws HyracksDataException, IndexException;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index a5ef536..c6564b4 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -32,7 +32,7 @@
public ILSMIOOperation createFlushOperation(ILSMIOOperationCallback callback);
public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
- IndexException;
+ IndexException;
/**
* Force a flush of the in-memory component.
@@ -57,4 +57,68 @@
* @throws IndexException
*/
public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * Attempts to insert the given tuple.
+ * If the insert would have to wait for a flush to complete, then this method returns false to
+ * allow the caller to avoid potential deadlock situations.
+ * Otherwise, returns true (insert was successful).
+ *
+ * @param tuple
+ * Tuple to be inserted.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ * @throws IndexException
+ * If an index-specific constraint is violated, e.g., the key
+ * already exists.
+ */
+ public boolean tryInsert(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * Attempts to delete the given tuple.
+ * If the delete would have to wait for a flush to complete, then this method returns false to
+ * allow the caller to avoid potential deadlock situations.
+ * Otherwise, returns true (delete was successful).
+ *
+ * @param tuple
+ * Tuple to be deleted.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ * @throws IndexException
+ * If there is no matching tuple in the index.
+ */
+ public boolean tryDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * Attempts to update the given tuple.
+ * If the update would have to wait for a flush to complete, then this method returns false to
+ * allow the caller to avoid potential deadlock situations.
+ * Otherwise, returns true (update was successful).
+ *
+ * @param tuple
+ * Tuple whose match in the index is to be update with the given
+ * tuples contents.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ * @throws IndexException
+ * If there is no matching tuple in the index.
+ */
+ public boolean tryUpdate(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * This operation is only supported by indexes with the notion of a unique key.
+ * If tuple's key already exists, then this operation attempts to performs an update.
+ * Otherwise, it attempts to perform an insert.
+ * If the operation would have to wait for a flush to complete, then this method returns false to
+ * allow the caller to avoid potential deadlock situations.
+ * Otherwise, returns true (insert/update was successful).
+ *
+ * @param tuple
+ * Tuple to be deleted.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ * @throws IndexException
+ * If there is no matching tuple in the index.
+ */
+ public boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException;
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
index 6c5e1a1..1635647 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
@@ -15,9 +15,11 @@
/**
* An {@link ILSMIndex} will call this method before an operation enters it,
* i.e., before any latches are taken.
- * After this method has been called, the operation is considered 'active'.
+ * If tryOperation is true, and the operation would have to wait for a flush,
+ * then this method does not block and returns false.
+ * Otherwise, this method returns true, and the operation is considered 'active' in the index.
*/
- public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException;
+ public boolean beforeOperation(ILSMIndexOperationContext opCtx, boolean tryOperation) throws HyracksDataException;
/**
* An {@link ILSMIndex} will call this method after an operation has left the index,
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 634800a..c4ac36e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -89,9 +89,12 @@
opTracker.afterOperation(opCtx);
}
- public void insertUpdateOrDelete(ITupleReference tuple, ILSMIndexOperationContext ctx) throws HyracksDataException,
- IndexException {
- opTracker.beforeOperation(ctx);
+ @Override
+ public boolean insertUpdateOrDelete(ITupleReference tuple, ILSMIndexOperationContext ctx, boolean tryOperation)
+ throws HyracksDataException, IndexException {
+ if (!opTracker.beforeOperation(ctx, tryOperation)) {
+ return false;
+ }
// It is possible, due to concurrent execution of operations, that an operation will
// fail. In such a case, simply retry the operation. Refer to the specific LSMIndex code
// to see exactly why an operation might fail.
@@ -100,6 +103,7 @@
} finally {
threadExit(ctx);
}
+ return true;
}
public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
@@ -135,7 +139,7 @@
// If the search doesn't include the in-memory component, then we don't have
// to synchronize with a flush.
if (includeMemComponent) {
- opTracker.beforeOperation(ctx);
+ opTracker.beforeOperation(ctx, true);
}
// Get a snapshot of the current on-disk Trees.
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index a4267d2..fd0d704 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -39,29 +39,54 @@
@Override
public void insert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.INSERT);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
}
@Override
public void update(ITupleReference tuple) throws HyracksDataException, IndexException {
// Update is the same as insert.
ctx.setOperation(IndexOperation.UPDATE);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
}
@Override
public void delete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.DELETE);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
}
@Override
public void upsert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.UPSERT);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
}
@Override
+ public boolean tryInsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.INSERT);
+ return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ }
+
+ @Override
+ public boolean tryDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.DELETE);
+ return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ }
+
+ @Override
+ public boolean tryUpdate(ITupleReference tuple) throws HyracksDataException, IndexException {
+ // Update is the same as insert.
+ ctx.setOperation(IndexOperation.UPDATE);
+ return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ }
+
+ @Override
+ public boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.UPSERT);
+ return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ }
+
+ @Override
public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.SEARCH);
lsmHarness.search(cursor, searchPred, ctx, true);
@@ -80,7 +105,7 @@
@Override
public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.PHYSICALDELETE);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
}
@Override
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
index 00dc66d..f8a8a28 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
@@ -16,7 +16,7 @@
private static final long serialVersionUID = 1L;
public static NoOpOperationTrackerFactory INSTANCE = new NoOpOperationTrackerFactory();
-
+
@Override
public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
return new ILSMOperationTracker() {
@@ -27,8 +27,10 @@
}
@Override
- public void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
- // Do nothing.
+ public boolean beforeOperation(ILSMIndexOperationContext opCtx, boolean tryOperation)
+ throws HyracksDataException {
+ // Do nothing.
+ return true;
}
@Override
@@ -37,9 +39,9 @@
}
};
}
-
+
// Enforce singleton.
private NoOpOperationTrackerFactory() {
}
-
+
};
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
index 5b10ba3..b77a97e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
@@ -20,11 +20,15 @@
}
@Override
- public synchronized void beforeOperation(ILSMIndexOperationContext opCtx) throws HyracksDataException {
+ public synchronized boolean beforeOperation(ILSMIndexOperationContext opCtx, boolean tryOperation)
+ throws HyracksDataException {
// Wait for pending flushes to complete.
// If flushFlag is set, then the flush is queued to occur by the last exiting thread.
// This operation should wait for that flush to occur before proceeding.
if (index.getFlushController().getFlushStatus(index)) {
+ if (tryOperation) {
+ return false;
+ }
try {
this.wait();
} catch (InterruptedException e) {
@@ -32,6 +36,7 @@
}
}
threadRefCount++;
+ return true;
}
@Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 2e78744..45d21e9 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -50,13 +50,25 @@
@Override
public void insert(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.INSERT);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
}
@Override
public void delete(ITupleReference tuple) throws HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.DELETE);
- lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx, false);
+ }
+
+ @Override
+ public boolean tryInsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.INSERT);
+ return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
+ }
+
+ @Override
+ public boolean tryDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.DELETE);
+ return lsmHarness.insertUpdateOrDelete(tuple, ctx, true);
}
public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException, IndexException {
@@ -122,6 +134,16 @@
}
@Override
+ public boolean tryUpdate(ITupleReference tuple) throws HyracksDataException, IndexException {
+ throw new UnsupportedOperationException("Update not supported by lsm inverted index.");
+ }
+
+ @Override
+ public boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ throw new UnsupportedOperationException("Upsert not supported by lsm inverted index.");
+ }
+
+ @Override
public IInvertedListCursor createInvertedListCursor() {
throw new UnsupportedOperationException("Cannot create inverted list cursor on lsm inverted index.");
}