Changed the io scheduler to allow asynchronous concurrent flush/merge ops.
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index 1aab213..5802421 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -82,4 +82,10 @@
public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
+
+ @Override
+ public Boolean call() throws Exception {
+ perform();
+ return true;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index bfa6000..ef5c113 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -93,4 +93,10 @@
public List<ILSMComponent> getMergingComponents() {
return mergingComponents;
}
+
+ @Override
+ public Boolean call() throws Exception {
+ perform();
+ return true;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index c23dcea..9118cc3 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -15,12 +15,13 @@
package edu.uci.ics.hyracks.storage.am.lsm.common.api;
import java.util.Set;
+import java.util.concurrent.Callable;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-public interface ILSMIOOperation {
+public interface ILSMIOOperation extends Callable<Boolean> {
public Set<IODeviceHandle> getReadDevices();
public Set<IODeviceHandle> getWriteDevices();
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index 0554dc3..25894f1 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -14,58 +14,24 @@
*/
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
public class AsynchronousScheduler implements ILSMIOOperationScheduler {
public final static AsynchronousScheduler INSTANCE = new AsynchronousScheduler();
-
- private OperationPerformerThread operationPerformerThread;
-
- private AsynchronousScheduler() {
- operationPerformerThread = new OperationPerformerThread();
- }
+ private ExecutorService executor;
public void init(ThreadFactory threadFactory) {
- Executor executor = Executors.newCachedThreadPool(threadFactory);
- executor.execute(operationPerformerThread);
+ executor = Executors.newCachedThreadPool(threadFactory);
}
@Override
public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
- operationPerformerThread.perform(operation);
+ executor.submit(operation);
}
-}
-
-class OperationPerformerThread extends Thread {
-
- private final LinkedBlockingQueue<ILSMIOOperation> operationsQueue = new LinkedBlockingQueue<ILSMIOOperation>();
-
- public void perform(ILSMIOOperation operation) {
- operationsQueue.offer(operation);
- }
-
- @Override
- public void run() {
- while (true) {
- ILSMIOOperation operation;
- try {
- operation = operationsQueue.take();
- } catch (InterruptedException e) {
- break;
- }
- try {
- operation.perform();
- } catch (HyracksDataException | IndexException e) {
- throw new RuntimeException(e);
- }
- }
- }
-}
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index cf5820e..242d1bb 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -87,4 +87,10 @@
public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
+
+ @Override
+ public Boolean call() throws Exception {
+ perform();
+ return true;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index e3d9a13..2d4bc9e 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -103,4 +103,9 @@
return mergingComponents;
}
+ @Override
+ public Boolean call() throws Exception {
+ perform();
+ return true;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 68098bb..77083d5 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -88,4 +88,10 @@
public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
+
+ @Override
+ public Boolean call() throws Exception {
+ perform();
+ return true;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index 4e2fab7..8d95c41 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -103,4 +103,10 @@
public List<ILSMComponent> getMergingComponents() {
return mergingComponents;
}
+
+ @Override
+ public Boolean call() throws Exception {
+ perform();
+ return true;
+ }
}