Fix for asterix issue 630.
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
index 6e17ff8..9c2268d 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenThread.java
@@ -23,21 +23,20 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
/**
- * Quick & dirty data generator for multi-thread testing.
- *
+ * Quick & dirty data generator for multi-thread testing.
*/
@SuppressWarnings("rawtypes")
public class DataGenThread extends Thread {
public final BlockingQueue<TupleBatch> tupleBatchQueue;
private final int maxNumBatches;
- private final int maxOutstandingBatches;
+ private final int maxOutstandingBatches;
private int numBatches = 0;
private final Random rnd;
-
+
// maxOutstandingBatches pre-created tuple-batches for populating the queue.
private TupleBatch[] tupleBatches;
private int ringPos;
-
+
public DataGenThread(int numConsumers, int maxNumBatches, int batchSize, ISerializerDeserializer[] fieldSerdes,
int payloadSize, int rndSeed, int maxOutstandingBatches, boolean sorted) {
this.maxNumBatches = maxNumBatches;
@@ -51,7 +50,7 @@
tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
ringPos = 0;
}
-
+
public DataGenThread(int numConsumers, int maxNumBatches, int batchSize, ISerializerDeserializer[] fieldSerdes,
IFieldValueGenerator[] fieldGens, int rndSeed, int maxOutstandingBatches) {
this.maxNumBatches = maxNumBatches;
@@ -64,13 +63,13 @@
tupleBatchQueue = new LinkedBlockingQueue<TupleBatch>(maxOutstandingBatches);
ringPos = 0;
}
-
+
@Override
public void run() {
- while(numBatches < maxNumBatches) {
+ while (numBatches < maxNumBatches) {
boolean added = false;
try {
- if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {
+ if (tupleBatches[ringPos].inUse.compareAndSet(false, true)) {
tupleBatches[ringPos].generate();
tupleBatchQueue.put(tupleBatches[ringPos]);
added = true;
@@ -89,11 +88,11 @@
}
}
}
-
+
public TupleBatch getBatch() throws InterruptedException {
return tupleBatchQueue.take();
}
-
+
public void releaseBatch(TupleBatch batch) {
batch.inUse.set(false);
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
index d7234fa..cb9c545 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/datagen/DataGenUtils.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-@SuppressWarnings("rawtypes")
+@SuppressWarnings("rawtypes")
public class DataGenUtils {
public static IFieldValueGenerator getFieldGenFromSerde(ISerializerDeserializer serde, Random rnd, boolean sorted) {
if (serde instanceof IntegerSerializerDeserializer) {
@@ -49,8 +49,9 @@
}
return null;
}
-
- public static IFieldValueGenerator[] getFieldGensFromSerdes(ISerializerDeserializer[] serdes, Random rnd, boolean sorted) {
+
+ public static IFieldValueGenerator[] getFieldGensFromSerdes(ISerializerDeserializer[] serdes, Random rnd,
+ boolean sorted) {
IFieldValueGenerator[] fieldValueGens = new IFieldValueGenerator[serdes.length];
for (int i = 0; i < serdes.length; i++) {
fieldValueGens[i] = getFieldGenFromSerde(serdes[i], rnd, sorted);
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 3e51e20..28f44e6 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -54,7 +54,6 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
@@ -164,8 +163,7 @@
}
if (flushOnExit) {
- BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
- ioOpCallback);
+ BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(ioOpCallback);
ILSMIndexAccessor accessor = createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
accessor.scheduleFlush(cb);
try {
@@ -370,7 +368,8 @@
opCtx.getComponentHolder().add(flushingComponent);
ILSMIndexAccessorInternal flushAccessor = new LSMBTreeAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMBTreeFlushOperation(flushAccessor, flushingComponent, componentFileRefs
- .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback));
+ .getInsertIndexFileReference(), componentFileRefs.getBloomFilterFileReference(), callback, fileManager
+ .getBaseDir()));
}
@Override
@@ -442,7 +441,8 @@
.getName(), lastFile.getFile().getName());
ILSMIndexAccessorInternal accessor = new LSMBTreeAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMBTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
- .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
+ .getInsertIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager
+ .getBaseDir()));
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
index 668e727..94b1569 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeFlushOperation.java
@@ -27,21 +27,24 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
-public class LSMBTreeFlushOperation implements ILSMIOOperation {
+public class LSMBTreeFlushOperation implements ILSMIOOperation, Comparable<LSMBTreeFlushOperation> {
private final ILSMIndexAccessorInternal accessor;
private final ILSMComponent flushingComponent;
private final FileReference btreeFlushTarget;
private final FileReference bloomFilterFlushTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMBTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
- FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+ FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback,
+ String indexIdentifier) {
this.accessor = accessor;
this.flushingComponent = flushingComponent;
this.btreeFlushTarget = btreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -83,4 +86,19 @@
public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.FLUSH;
+ }
+
+ @Override
+ public int compareTo(LSMBTreeFlushOperation o) {
+ return btreeFlushTarget.getFile().getName().compareTo(o.getBTreeFlushTarget().getFile().getName());
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
index 3a608fe..a4f6875 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeMergeOperation.java
@@ -37,16 +37,18 @@
private final FileReference btreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMBTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
ITreeIndexCursor cursor, FileReference btreeMergeTarget, FileReference bloomFilterMergeTarget,
- ILSMIOOperationCallback callback) {
+ ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.mergingComponents = mergingComponents;
this.cursor = cursor;
this.btreeMergeTarget = btreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -94,4 +96,14 @@
public List<ILSMComponent> getMergingComponents() {
return mergingComponents;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.MERGE;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 2c3940f..1638a5d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -22,6 +22,12 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
public interface ILSMIOOperation extends Callable<Boolean> {
+
+ public enum LSMIOOpertionType {
+ FLUSH,
+ MERGE
+ }
+
public Set<IODeviceHandle> getReadDevices();
public Set<IODeviceHandle> getWriteDevices();
@@ -29,4 +35,8 @@
public Boolean call() throws HyracksDataException, IndexException;
public ILSMIOOperationCallback getCallback();
+
+ public String getIndexUniqueIdentifier();
+
+ public LSMIOOpertionType getIOOpertionType();
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
index 25894f1..b785764 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AsynchronousScheduler.java
@@ -14,24 +14,87 @@
*/
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOpertionType;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
public class AsynchronousScheduler implements ILSMIOOperationScheduler {
+ // Since this is a asynchronous scheduler, we make sure that flush operations coming from the same lsm index
+ // will be executed serially in same order of scheduling the operations. Look at asterix issue 630.
+
public final static AsynchronousScheduler INSTANCE = new AsynchronousScheduler();
private ExecutorService executor;
+ private final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<String, ILSMIOOperation>();
+ private final Map<String, PriorityQueue<ILSMIOOperation>> waitingFlushOperations = new HashMap<String, PriorityQueue<ILSMIOOperation>>();
public void init(ThreadFactory threadFactory) {
- executor = Executors.newCachedThreadPool(threadFactory);
+ // Creating an executor with the same configuration of Executors.newCachedThreadPool.
+ executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), threadFactory) {
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+ return new LSMIOOperationTask<T>(callable);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ LSMIOOperationTask<Boolean> task = (LSMIOOperationTask<Boolean>) r;
+ ILSMIOOperation executedOp = task.getOperation();
+ String id = executedOp.getIndexUniqueIdentifier();
+ synchronized (this) {
+ runningFlushOperations.remove(id);
+ if (waitingFlushOperations.containsKey(id)) {
+ try {
+ ILSMIOOperation op = waitingFlushOperations.get(id).poll();
+ if (op != null) {
+ scheduleOperation(op);
+ } else {
+ waitingFlushOperations.remove(id);
+ }
+ } catch (HyracksDataException e) {
+ t = e.getCause();
+ }
+ }
+ }
+ }
+ };
}
@Override
public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
- executor.submit(operation);
+ if (operation.getIOOpertionType() == LSMIOOpertionType.MERGE) {
+ executor.submit(operation);
+ } else {
+ String id = operation.getIndexUniqueIdentifier();
+ synchronized (executor) {
+ if (runningFlushOperations.containsKey(id)) {
+ if (waitingFlushOperations.containsKey(id)) {
+ waitingFlushOperations.get(id).offer(operation);
+ } else {
+ PriorityQueue<ILSMIOOperation> q = new PriorityQueue<ILSMIOOperation>();
+ q.offer(operation);
+ waitingFlushOperations.put(id, q);
+ }
+ } else {
+ runningFlushOperations.put(id, operation);
+ executor.submit(operation);
+ }
+ }
+ }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java
new file mode 100644
index 0000000..9743afe
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIOOperationTask.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+
+public class LSMIOOperationTask<T> extends FutureTask<T> {
+ private final ILSMIOOperation operation;
+
+ public LSMIOOperationTask(Callable<T> callable) {
+ super(callable);
+ this.operation = (ILSMIOOperation) callable;
+ }
+
+ public ILSMIOOperation getOperation() {
+ return operation;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 0600754..93c5a58 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -438,7 +438,7 @@
ioScheduler.scheduleOperation(new LSMInvertedIndexFlushOperation(
new LSMInvertedIndexAccessor(lsmHarness, opCtx), flushingComponent, componentFileRefs
.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
- componentFileRefs.getBloomFilterFileReference(), callback));
+ componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
}
@Override
@@ -543,7 +543,7 @@
ILSMIndexAccessorInternal accessor = new LSMInvertedIndexAccessor(lsmHarness, ctx);
ioScheduler.scheduleOperation(new LSMInvertedIndexMergeOperation(accessor, mergingComponents, cursor,
relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs.getDeleteIndexFileReference(),
- relMergeFileRefs.getBloomFilterFileReference(), callback));
+ relMergeFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
index 45433e7..beb7643 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFlushOperation.java
@@ -28,23 +28,25 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
-public class LSMInvertedIndexFlushOperation implements ILSMIOOperation {
+public class LSMInvertedIndexFlushOperation implements ILSMIOOperation, Comparable<LSMInvertedIndexFlushOperation> {
private final ILSMIndexAccessorInternal accessor;
private final ILSMComponent flushingComponent;
private final FileReference dictBTreeFlushTarget;
private final FileReference deletedKeysBTreeFlushTarget;
private final FileReference bloomFilterFlushTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMInvertedIndexFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
FileReference dictBTreeFlushTarget, FileReference deletedKeysBTreeFlushTarget,
- FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback) {
+ FileReference bloomFilterFlushTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.flushingComponent = flushingComponent;
this.dictBTreeFlushTarget = dictBTreeFlushTarget;
this.deletedKeysBTreeFlushTarget = deletedKeysBTreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -88,4 +90,19 @@
public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.FLUSH;
+ }
+
+ @Override
+ public int compareTo(LSMInvertedIndexFlushOperation o) {
+ return dictBTreeFlushTarget.getFile().getName().compareTo(o.getDictBTreeFlushTarget().getFile().getName());
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
index 7cd921a..5a1e413 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexMergeOperation.java
@@ -38,10 +38,11 @@
private final FileReference deletedKeysBTreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMInvertedIndexMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
IIndexCursor cursor, FileReference dictBTreeMergeTarget, FileReference deletedKeysBTreeMergeTarget,
- FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback) {
+ FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.mergingComponents = mergingComponents;
this.cursor = cursor;
@@ -49,6 +50,7 @@
this.deletedKeysBTreeMergeTarget = deletedKeysBTreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -103,4 +105,14 @@
public List<ILSMComponent> getMergingComponents() {
return mergingComponents;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.MERGE;
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index 662aa02..3d2cc62 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -177,7 +177,7 @@
LSMRTreeAccessor accessor = new LSMRTreeAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, componentFileRefs
.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(), componentFileRefs
- .getBloomFilterFileReference(), callback));
+ .getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
}
@Override
@@ -295,7 +295,8 @@
ILSMIndexAccessorInternal accessor = new LSMRTreeAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeMergeOperation((ILSMIndexAccessorInternal) accessor,
mergingComponents, cursor, relMergeFileRefs.getInsertIndexFileReference(), relMergeFileRefs
- .getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback));
+ .getDeleteIndexFileReference(), relMergeFileRefs.getBloomFilterFileReference(), callback,
+ fileManager.getBaseDir()));
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
index 18d7a7e..0d40eb4 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFlushOperation.java
@@ -27,7 +27,7 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
-public class LSMRTreeFlushOperation implements ILSMIOOperation {
+public class LSMRTreeFlushOperation implements ILSMIOOperation, Comparable<LSMRTreeFlushOperation> {
private final ILSMIndexAccessorInternal accessor;
private final ILSMComponent flushingComponent;
@@ -35,16 +35,18 @@
private final FileReference btreeFlushTarget;
private final FileReference bloomFilterFlushTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMRTreeFlushOperation(ILSMIndexAccessorInternal accessor, ILSMComponent flushingComponent,
FileReference rtreeFlushTarget, FileReference btreeFlushTarget, FileReference bloomFilterFlushTarget,
- ILSMIOOperationCallback callback) {
+ ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.flushingComponent = flushingComponent;
this.rtreeFlushTarget = rtreeFlushTarget;
this.btreeFlushTarget = btreeFlushTarget;
this.bloomFilterFlushTarget = bloomFilterFlushTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -89,4 +91,19 @@
public ILSMComponent getFlushingComponent() {
return flushingComponent;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.FLUSH;
+ }
+
+ @Override
+ public int compareTo(LSMRTreeFlushOperation o) {
+ return rtreeFlushTarget.getFile().getName().compareTo(o.getRTreeFlushTarget().getFile().getName());
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
index da7a2fb..ddf3ebf 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeMergeOperation.java
@@ -36,10 +36,11 @@
private final FileReference btreeMergeTarget;
private final FileReference bloomFilterMergeTarget;
private final ILSMIOOperationCallback callback;
+ private final String indexIdentifier;
public LSMRTreeMergeOperation(ILSMIndexAccessorInternal accessor, List<ILSMComponent> mergingComponents,
ITreeIndexCursor cursor, FileReference rtreeMergeTarget, FileReference btreeMergeTarget,
- FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback) {
+ FileReference bloomFilterMergeTarget, ILSMIOOperationCallback callback, String indexIdentifier) {
this.accessor = accessor;
this.mergingComponents = mergingComponents;
this.cursor = cursor;
@@ -47,6 +48,7 @@
this.btreeMergeTarget = btreeMergeTarget;
this.bloomFilterMergeTarget = bloomFilterMergeTarget;
this.callback = callback;
+ this.indexIdentifier = indexIdentifier;
}
@Override
@@ -104,4 +106,14 @@
public List<ILSMComponent> getMergingComponents() {
return mergingComponents;
}
+
+ @Override
+ public String getIndexUniqueIdentifier() {
+ return indexIdentifier;
+ }
+
+ @Override
+ public LSMIOOpertionType getIOOpertionType() {
+ return LSMIOOpertionType.MERGE;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
index ec66a39a..bfd0260 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java
@@ -155,7 +155,7 @@
opCtx.getComponentHolder().add(flushingComponent);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, opCtx);
ioScheduler.scheduleOperation(new LSMRTreeFlushOperation(accessor, flushingComponent, relFlushFileRefs
- .getInsertIndexFileReference(), null, null, callback));
+ .getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir()));
}
@Override
@@ -256,7 +256,7 @@
LSMComponentFileReferences relMergeFileRefs = getMergeTargetFileName(mergingComponents);
ILSMIndexAccessorInternal accessor = new LSMRTreeWithAntiMatterTuplesAccessor(lsmHarness, rctx);
ioScheduler.scheduleOperation(new LSMRTreeMergeOperation(accessor, mergingComponents, cursor, relMergeFileRefs
- .getInsertIndexFileReference(), null, null, callback));
+ .getInsertIndexFileReference(), null, null, callback, fileManager.getBaseDir()));
}
@Override
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
index 3591f78a..fb06a7e 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java
@@ -20,10 +20,12 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.ThreadFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
@@ -37,9 +39,9 @@
import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoMergePolicy;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.SynchronousScheduler;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ThreadCountingTracker;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -50,8 +52,8 @@
public class LSMTreeRunner implements IExperimentRunner {
- private static final int MAX_OPEN_FILES = 10000;
- private static final int HYRACKS_FRAME_SIZE = 128;
+ private static final int MAX_OPEN_FILES = Integer.MAX_VALUE;
+ private static final int HYRACKS_FRAME_SIZE = 131072;
protected IHyracksTaskContext ctx;
protected IOManager ioManager;
@@ -61,7 +63,7 @@
protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
protected final static String sep = System.getProperty("file.separator");
- protected final static String classDir = "/lsmtree/";
+ protected final static String classDir = "/tmp/lsmtree/";
protected String onDiskDir;
protected FileReference file;
@@ -71,6 +73,11 @@
protected IBufferCache memBufferCache;
private final int onDiskPageSize;
private final int onDiskNumPages;
+ private final static ThreadFactory threadFactory = new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r);
+ }
+ };
public LSMTreeRunner(int numBatches, int inMemPageSize, int inMemNumPages, int onDiskPageSize, int onDiskNumPages,
ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields,
@@ -93,11 +100,13 @@
List<IVirtualBufferCache> virtualBufferCaches = new ArrayList<IVirtualBufferCache>();
for (int i = 0; i < 2; i++) {
IVirtualBufferCache virtualBufferCache = new VirtualBufferCache(new HeapBufferAllocator(), inMemPageSize,
- inMemNumPages);
+ inMemNumPages / 2);
virtualBufferCaches.add(virtualBufferCache);
}
- this.ioScheduler = SynchronousScheduler.INSTANCE;
+ this.ioScheduler = AsynchronousScheduler.INSTANCE;
+ AsynchronousScheduler.INSTANCE.init(threadFactory);
+
lsmtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fmp, typeTraits, cmpFactories,
bloomFilterKeyFields, bloomFilterFalsePositiveRate, NoMergePolicy.INSTANCE,
new ThreadCountingTracker(), ioScheduler, NoOpIOOperationCallback.INSTANCE);
@@ -133,15 +142,23 @@
@Override
public void reset() throws Exception {
+ try {
+ lsmtree.deactivate();
+ } catch (HyracksDataException e) {
+ // ignore
+ }
+ try {
+ lsmtree.destroy();
+ } catch (HyracksDataException e) {
+ // ignore
+ }
+
lsmtree.create();
+ lsmtree.activate();
}
@Override
public void deinit() throws Exception {
- bufferCache.closeFile(lsmtreeFileId);
- bufferCache.close();
- memBufferCache.closeFile(lsmtreeFileId);
- memBufferCache.close();
}
public class LSMTreeThread extends Thread {
@@ -166,6 +183,7 @@
} catch (TreeIndexException e) {
}
}
+ dataGen.releaseBatch(batch);
}
} catch (Exception e) {
e.printStackTrace();
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
index 641362e..2354550 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/perf/PerfExperiment.java
@@ -31,18 +31,20 @@
public static void main(String[] args) throws Exception {
// Disable logging so we can better see the output times.
Enumeration<String> loggers = LogManager.getLogManager().getLoggerNames();
- while(loggers.hasMoreElements()) {
+ while (loggers.hasMoreElements()) {
String loggerName = loggers.nextElement();
Logger logger = LogManager.getLogManager().getLogger(loggerName);
logger.setLevel(Level.OFF);
}
-
- int numTuples = 100000; // 100K
+ boolean sorted = Boolean.parseBoolean(args[0]);
+ int numThreads = Integer.parseInt(args[1]);
+
+ //int numTuples = 100000; // 100K
//int numTuples = 1000000; // 1M
//int numTuples = 2000000; // 2M
//int numTuples = 3000000; // 3M
//int numTuples = 10000000; // 10M
- //int numTuples = 20000000; // 20M
+ int numTuples = 20000000; // 20M
//int numTuples = 30000000; // 30M
//int numTuples = 40000000; // 40M
//int numTuples = 60000000; // 60M
@@ -50,27 +52,41 @@
//int numTuples = 200000000; // 200M
int batchSize = 10000;
int numBatches = numTuples / batchSize;
-
+
+ int payLoadSize = 240;
ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE };
- ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes, 30);
-
- IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes, fieldSerdes.length);
-
+ ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes, payLoadSize);
+
+ IBinaryComparatorFactory[] cmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes,
+ fieldSerdes.length);
+ int[] bloomFilterKeyFields = new int[cmpFactories.length];
+ for (int i = 0; i < bloomFilterKeyFields.length; i++) {
+ bloomFilterKeyFields[i] = i;
+ }
+ double bloomFilterFalsePositiveRate = 0.01;
+
//int repeats = 1000;
int repeats = 1;
long[] times = new long[repeats];
- int numThreads = 2;
+// int numThreads = 4;
+// boolean sorted = true;
for (int i = 0; i < repeats; i++) {
//ConcurrentSkipListRunner runner = new ConcurrentSkipListRunner(numBatches, batchSize, tupleSize, typeTraits, cmp);
- InMemoryBTreeRunner runner = new InMemoryBTreeRunner(numBatches, 8192, 100000, typeTraits, cmpFactories);
+ //InMemoryBTreeRunner runner = new InMemoryBTreeRunner(numBatches, 8192, 100000, typeTraits, cmpFactories);
//BTreeBulkLoadRunner runner = new BTreeBulkLoadRunner(numBatches, 8192, 100000, typeTraits, cmp, 1.0f);
- //BTreeRunner runner = new BTreeRunner(numBatches, 8192, 100000, typeTraits, cmp);
- //String btreeName = "071211";
- //BTreeSearchRunner runner = new BTreeSearchRunner(btreeName, 10, numBatches, 8192, 25000, typeTraits, cmp);
- //LSMTreeRunner runner = new LSMTreeRunner(numBatches, 8192, 100, 8192, 250, typeTraits, cmp);
- //LSMTreeSearchRunner runner = new LSMTreeSearchRunner(100000, numBatches, 8192, 24750, 8192, 250, typeTraits, cmp);
- DataGenThread dataGen = new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, 30, 50, 10, false);
+ //BTreeRunner runner = new BTreeRunner(numBatches, 8192, 100000, typeTraits, cmp);
+ //String btreeName = "071211";
+ //BTreeSearchRunner runner = new BTreeSearchRunner(btreeName, 10, numBatches, 8192, 25000, typeTraits, cmp);
+ //LSMTreeRunner runner = new LSMTreeRunner(numBatches, 8192, 100, 8192, 250, typeTraits, cmp);
+ //LSMTreeSearchRunner runner = new LSMTreeSearchRunner(100000, numBatches, 8192, 24750, 8192, 250, typeTraits, cmp);
+ int inMemPageSize = 131072; // 128kb
+ int onDiskPageSize = inMemPageSize;
+ int inMemNumPages = 8192; // 1GB
+ int onDiskNumPages = 16384; // 2GB
+ LSMTreeRunner runner = new LSMTreeRunner(numBatches, inMemPageSize, inMemNumPages, onDiskPageSize,
+ onDiskNumPages, typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate);
+ DataGenThread dataGen = new DataGenThread(numThreads, numBatches, batchSize, fieldSerdes, payLoadSize, 50, 10, sorted);
dataGen.start();
runner.reset();
times[i] = runner.runExperiment(dataGen, numThreads);