merged hyracks_lsm_tree and fullstack_asterix_stabilization
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_lsm_staging@3026 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-storage-am-lsm-common/pom.xml b/hyracks/hyracks-storage-am-lsm-common/pom.xml
new file mode 100644
index 0000000..ee62131
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/pom.xml
@@ -0,0 +1,47 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-storage-am-lsm-common</artifactId>
+
+ <parent>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ </parent>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.hyracks</groupId>
+ <artifactId>hyracks-storage-am-btree</artifactId>
+ <version>0.2.3-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IInMemoryBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IInMemoryBufferCache.java
new file mode 100644
index 0000000..082ad2f
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IInMemoryBufferCache.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCacheInternal;
+
+public interface IInMemoryBufferCache extends IBufferCacheInternal {
+ public void open();
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
new file mode 100644
index 0000000..fa00f85
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponent.java
@@ -0,0 +1,10 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
+
+public interface ILSMComponent {
+ public boolean threadEnter(LSMOperationType opType) throws InterruptedException;
+
+ public void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
new file mode 100644
index 0000000..1f3a2b7
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFactory.java
@@ -0,0 +1,13 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+
+public interface ILSMComponentFactory {
+ public ILSMComponent createLSMComponentInstance(LSMComponentFileReferences cfr) throws IndexException,
+ HyracksDataException;
+
+ public IBufferCache getBufferCache();
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
new file mode 100644
index 0000000..04d7fa3
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+
+public interface ILSMHarness {
+
+ public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
+ IndexException;
+
+ public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
+ throws HyracksDataException, IndexException;
+
+ public void noOp(ILSMIndexOperationContext ctx) throws HyracksDataException;
+
+ public void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException;
+
+ public void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException;
+
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException;
+
+ public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException;
+
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException;
+
+ public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException;
+
+ public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException;
+
+ public ILSMOperationTracker getOperationTracker();
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
new file mode 100644
index 0000000..15fdc6e
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -0,0 +1,17 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.util.Set;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+
+public interface ILSMIOOperation {
+ public Set<IODeviceHandle> getReadDevices();
+
+ public Set<IODeviceHandle> getWriteDevices();
+
+ public void perform() throws HyracksDataException, IndexException;
+
+ public ILSMIOOperationCallback getCallback();
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
new file mode 100644
index 0000000..bf12583
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
@@ -0,0 +1,14 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ILSMIOOperationCallback {
+ public void beforeOperation() throws HyracksDataException;
+
+ public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ throws HyracksDataException;
+
+ public void afterFinalize(ILSMComponent newComponent) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
new file mode 100644
index 0000000..52361ee
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+public interface ILSMIOOperationCallbackFactory extends Serializable {
+ public ILSMIOOperationCallback createIOOperationCallback(Object syncObj);
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackProvider.java
new file mode 100644
index 0000000..b5c9741
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackProvider.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+public interface ILSMIOOperationCallbackProvider {
+ public ILSMIOOperationCallback getIOOperationCallback(ILSMIndex index);
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationScheduler.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationScheduler.java
new file mode 100644
index 0000000..6d96562
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationScheduler.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ILSMIOOperationScheduler {
+ public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerProvider.java
new file mode 100644
index 0000000..9737728
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationSchedulerProvider.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface ILSMIOOperationSchedulerProvider extends Serializable {
+ public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx);
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
new file mode 100644
index 0000000..cff47bb
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+
+/**
+ * Methods to be implemented by an LSM index, which are called from {@link LSMHarness}.
+ * The implementations of the methods below should be thread agnostic.
+ * Synchronization of LSM operations like updates/searches/flushes/merges are
+ * done by the {@link LSMHarness}. For example, a flush() implementation should only
+ * create and return the new on-disk component, ignoring the fact that
+ * concurrent searches/updates/merges may be ongoing.
+ */
+public interface ILSMIndex extends IIndex {
+
+ public void deactivate(boolean flushOnExit) throws HyracksDataException;
+
+ public ILSMIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback);
+
+ public boolean getFlushStatus(ILSMIndex index);
+
+ public ILSMOperationTracker getOperationTracker();
+
+ public ILSMIOOperationScheduler getIOScheduler();
+
+ public List<ILSMComponent> getImmutableComponents();
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
new file mode 100644
index 0000000..a85c24b
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+
+/**
+ * Client handle for performing operations
+ * (insert/delete/update/search/diskorderscan/merge/flush) on an {@link ILSMHarness}.
+ * An {@link ILSMIndexAccessor} is not thread safe, but different {@link ILSMIndexAccessor}s
+ * can concurrently operate on the same {@link ILSMIndex} (i.e., the {@link ILSMIndex} must allow
+ * concurrent operations).
+ */
+public interface ILSMIndexAccessor extends IIndexAccessor {
+ public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
+
+ public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
+
+ /**
+ * Deletes the tuple from the memory component only.
+ *
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
+ public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * Attempts to insert the given tuple.
+ * If the insert would have to wait for a flush to complete, then this method returns false to
+ * allow the caller to avoid potential deadlock situations.
+ * Otherwise, returns true (insert was successful).
+ *
+ * @param tuple
+ * Tuple to be inserted.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ * @throws IndexException
+ * If an index-specific constraint is violated, e.g., the key
+ * already exists.
+ */
+ public boolean tryInsert(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * Attempts to delete the given tuple.
+ * If the delete would have to wait for a flush to complete, then this method returns false to
+ * allow the caller to avoid potential deadlock situations.
+ * Otherwise, returns true (delete was successful).
+ *
+ * @param tuple
+ * Tuple to be deleted.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ * @throws IndexException
+ * If there is no matching tuple in the index.
+ */
+ public boolean tryDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * Attempts to update the given tuple.
+ * If the update would have to wait for a flush to complete, then this method returns false to
+ * allow the caller to avoid potential deadlock situations.
+ * Otherwise, returns true (update was successful).
+ *
+ * @param tuple
+ * Tuple whose match in the index is to be update with the given
+ * tuples contents.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ * @throws IndexException
+ * If there is no matching tuple in the index.
+ */
+ public boolean tryUpdate(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * This operation is only supported by indexes with the notion of a unique key.
+ * If tuple's key already exists, then this operation attempts to performs an update.
+ * Otherwise, it attempts to perform an insert.
+ * If the operation would have to wait for a flush to complete, then this method returns false to
+ * allow the caller to avoid potential deadlock situations.
+ * Otherwise, returns true (insert/update was successful).
+ *
+ * @param tuple
+ * Tuple to be deleted.
+ * @throws HyracksDataException
+ * If the BufferCache throws while un/pinning or un/latching.
+ * @throws IndexException
+ * If there is no matching tuple in the index.
+ */
+ public boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ public void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ public void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ public void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ /**
+ * This method can be used to increase the number of 'active' operations of an index artificially,
+ * without actually modifying the index.
+ * This method does not block and is guaranteed to trigger the {@link ILSMOperationTracker}'s beforeOperation
+ * and afterOperation calls.
+ *
+ * @throws HyracksDataException
+ */
+ public void noOp() throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessorInternal.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessorInternal.java
new file mode 100644
index 0000000..8c28d47
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexAccessorInternal.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
+
+public interface ILSMIndexAccessorInternal extends ILSMIndexAccessor {
+
+ /**
+ * Force a flush of the in-memory component.
+ *
+ * @throws HyracksDataException
+ * @throws TreeIndexException
+ */
+ public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
+
+ /**
+ * Merge all on-disk components.
+ *
+ * @throws HyracksDataException
+ * @throws TreeIndexException
+ */
+ public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException;
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexFileManager.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexFileManager.java
new file mode 100644
index 0000000..bc922fe
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexFileManager.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.util.Comparator;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+
+/**
+ * Provides file names for LSM on-disk components. Also cleans up invalid files.
+ * There are separate methods to get file names for merge and flush because we
+ * need to guarantee the correct order of on-disk components (i.e., the
+ * components produced by flush are always newer than those produced by a
+ * merge).
+ */
+public interface ILSMIndexFileManager {
+ public void createDirs();
+
+ public void deleteDirs();
+
+ public LSMComponentFileReferences getRelFlushFileReference();
+
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
+ throws HyracksDataException;
+
+ public String getBaseDir();
+
+ // Deletes invalid files, and returns list of valid files from baseDir.
+ // The returned valid files are correctly sorted (based on the recency of data).
+ public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException, IndexException;
+
+ public Comparator<String> getFileNameComparator();
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
new file mode 100644
index 0000000..e98165b
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexInternal.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+
+public interface ILSMIndexInternal extends ILSMIndex {
+ public ILSMIndexAccessorInternal createAccessor(IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback);
+
+ public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException, IndexException;
+
+ public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException;
+
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException;
+
+ public ILSMComponent flush(ILSMIOOperation operation) throws HyracksDataException, IndexException;
+
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException;
+
+ public ILSMComponent merge(List<ILSMComponent> mergedComponents, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException;
+
+ public void addComponent(ILSMComponent index);
+
+ public void subsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents);
+
+ /**
+ * Populates the context's component holder with a snapshot of the components involved in the operation.
+ *
+ * @param ctx
+ * - the operation's context
+ */
+ public void getOperationalComponents(ILSMIndexOperationContext ctx);
+
+ public IInMemoryFreePageManager getInMemoryFreePageManager();
+
+ public List<ILSMComponent> getImmutableComponents();
+
+ public void markAsValid(ILSMComponent lsmComponent) throws HyracksDataException;
+
+ public void setFlushStatus(ILSMIndex index, boolean needsFlush);
+
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
new file mode 100644
index 0000000..864d0e7
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+
+public interface ILSMIndexOperationContext extends IIndexOperationContext {
+ public List<ILSMComponent> getComponentHolder();
+
+ public ISearchOperationCallback getSearchOperationCallback();
+
+ public IModificationOperationCallback getModificationCallback();
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
new file mode 100644
index 0000000..877c6ffd
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+
+public interface ILSMMergePolicy {
+ public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException, IndexException;
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
new file mode 100644
index 0000000..57a9609
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMMergePolicyProvider.java
@@ -0,0 +1,9 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+
+public interface ILSMMergePolicyProvider extends Serializable {
+ public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx);
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
new file mode 100644
index 0000000..c3f1f3e
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTracker.java
@@ -0,0 +1,44 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
+
+/**
+ * This interface exposes methods for tracking and setting the status of operations for the purpose
+ * of coordinating flushes/merges in {@link ILSMIndex}.
+ * Note that 'operation' below refers to {@link IIndexAccessor} methods.
+ *
+ * @author zheilbron
+ */
+public interface ILSMOperationTracker {
+
+ /**
+ * An {@link ILSMIndex} will call this method before an operation enters it,
+ * i.e., before any latches are taken.
+ * If tryOperation is true, and the operation would have to wait for a flush,
+ * then this method does not block and returns false.
+ * Otherwise, this method returns true, and the operation is considered 'active' in the index.
+ */
+ public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException;
+
+ /**
+ * An {@link ILSMIndex} will call this method after an operation has left the index,
+ * i.e., after all relevant latches have been released.
+ * After this method has been called, the operation is still considered 'active',
+ * until the issuer of the operation declares it completed by calling completeOperation().
+ */
+ public void afterOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException;
+
+ /**
+ * This method must be called by whoever is requesting the index operation through an {@link IIndexAccessor}.
+ * The use of this method indicates that the operation is no longer 'active'
+ * for the purpose of coordinating flushes/merges.
+ */
+ public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
new file mode 100644
index 0000000..db7ff6c
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMOperationTrackerFactory.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+public interface ILSMOperationTrackerFactory extends Serializable {
+ public ILSMOperationTracker createOperationTracker(ILSMIndex index);
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTreeTupleReference.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTreeTupleReference.java
new file mode 100644
index 0000000..8d82673
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTreeTupleReference.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+
+public interface ILSMTreeTupleReference extends ITreeIndexTupleReference {
+ public boolean isAntimatter();
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelper.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelper.java
new file mode 100644
index 0000000..ea7c3b4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelper.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+
+public abstract class AbstractLSMIndexDataflowHelper extends IndexDataflowHelper {
+
+ protected static int DEFAULT_MEM_PAGE_SIZE = 32768;
+ protected static int DEFAULT_MEM_NUM_PAGES = 1000;
+
+ protected final int memPageSize;
+ protected final int memNumPages;
+
+ protected final ILSMMergePolicy mergePolicy;
+ protected final ILSMIOOperationScheduler ioScheduler;
+ protected final ILSMOperationTrackerFactory opTrackerFactory;
+ protected final ILSMIOOperationCallbackProvider ioOpCallbackProvider;
+
+ public AbstractLSMIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+ ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
+ ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
+ this(opDesc, ctx, partition, DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES, mergePolicy, opTrackerFactory,
+ ioScheduler, ioOpCallbackProvider);
+ }
+
+ public AbstractLSMIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+ int memPageSize, int memNumPages, ILSMMergePolicy mergePolicy,
+ ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
+ super(opDesc, ctx, partition);
+ this.memPageSize = memPageSize;
+ this.memNumPages = memNumPages;
+ this.mergePolicy = mergePolicy;
+ this.opTrackerFactory = opTrackerFactory;
+ this.ioScheduler = ioScheduler;
+ this.ioOpCallbackProvider = ioOpCallbackProvider;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
new file mode 100644
index 0000000..a2f2a11
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/AbstractLSMIndexDataflowHelperFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+
+public abstract class AbstractLSMIndexDataflowHelperFactory implements IIndexDataflowHelperFactory {
+ protected static final long serialVersionUID = 1L;
+
+ protected final ILSMMergePolicyProvider mergePolicyProvider;
+ protected final ILSMOperationTrackerFactory opTrackerFactory;
+ protected final ILSMIOOperationSchedulerProvider ioSchedulerProvider;
+ protected final ILSMIOOperationCallbackProvider ioOpCallbackProvider;
+ protected final int memPageSize;
+ protected final int memNumPages;
+
+ public AbstractLSMIndexDataflowHelperFactory(ILSMMergePolicyProvider mergePolicyProvider,
+ ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationSchedulerProvider ioSchedulerProvider,
+ ILSMIOOperationCallbackProvider ioOpCallbackProvider, int memPageSize, int memNumPages) {
+ this.mergePolicyProvider = mergePolicyProvider;
+ this.opTrackerFactory = opTrackerFactory;
+ this.ioSchedulerProvider = ioSchedulerProvider;
+ this.ioOpCallbackProvider = ioOpCallbackProvider;
+ this.memPageSize = memPageSize;
+ this.memNumPages = memNumPages;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
new file mode 100644
index 0000000..baa9648
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertUpdateDeleteOperatorNodePushable {
+
+ protected FrameTupleAppender appender;
+
+ public LSMIndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+ int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op) {
+ super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, op);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
+ int lastFlushedTupleIndex = 0;
+ int tupleCount = accessor.getTupleCount();
+ for (int i = 0; i < tupleCount; i++) {
+ try {
+ if (tupleFilter != null) {
+ frameTuple.reset(accessor, i);
+ if (!tupleFilter.accept(frameTuple)) {
+ lsmAccessor.noOp();
+ continue;
+ }
+ }
+ tuple.reset(accessor, i);
+
+ switch (op) {
+ case INSERT: {
+ if (!lsmAccessor.tryInsert(tuple)) {
+ flushPartialFrame(lastFlushedTupleIndex, i);
+ lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+ lsmAccessor.insert(tuple);
+ }
+ break;
+ }
+ case DELETE: {
+ if (!lsmAccessor.tryDelete(tuple)) {
+ flushPartialFrame(lastFlushedTupleIndex, i);
+ lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+ lsmAccessor.delete(tuple);
+ }
+ break;
+ }
+ case UPSERT: {
+ if (!lsmAccessor.tryUpsert(tuple)) {
+ flushPartialFrame(lastFlushedTupleIndex, i);
+ lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+ lsmAccessor.upsert(tuple);
+ }
+ break;
+ }
+ case UPDATE: {
+ if (!lsmAccessor.tryUpdate(tuple)) {
+ flushPartialFrame(lastFlushedTupleIndex, i);
+ lastFlushedTupleIndex = (i == 0) ? 0 : i - 1;
+ lsmAccessor.update(tuple);
+ }
+ break;
+ }
+ default: {
+ throw new HyracksDataException("Unsupported operation " + op
+ + " in tree index InsertUpdateDelete operator");
+ }
+ }
+ } catch (HyracksDataException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if (lastFlushedTupleIndex == 0) {
+ // No partial flushing was necessary. Forward entire frame.
+ System.arraycopy(buffer.array(), 0, writeBuffer.array(), 0, buffer.capacity());
+ FrameUtils.flushFrame(writeBuffer, writer);
+ } else {
+ // Flush remaining partial frame.
+ flushPartialFrame(lastFlushedTupleIndex, tupleCount);
+ }
+ }
+
+ private void flushPartialFrame(int startTupleIndex, int endTupleIndex) throws HyracksDataException {
+ if (appender == null) {
+ appender = new FrameTupleAppender(ctx.getFrameSize());
+ }
+ appender.reset(writeBuffer, true);
+ for (int i = startTupleIndex; i < endTupleIndex; i++) {
+ if (!appender.append(accessor, i)) {
+ throw new IllegalStateException("Failed to append tuple into frame.");
+ }
+ }
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
new file mode 100644
index 0000000..6297576
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
+
+public class LSMTreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int[] fieldPermutation;
+ private final IndexOperation op;
+
+ public LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
+ IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+ IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
+ IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
+ ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+ super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, tupleFilterFactory, false,
+ NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+ modificationOpCallbackProvider);
+ this.fieldPermutation = fieldPermutation;
+ this.op = op;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new LSMIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
+ recordDescProvider, op);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/DualIndexInMemoryBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/DualIndexInMemoryBufferCache.java
new file mode 100644
index 0000000..c24c473
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/DualIndexInMemoryBufferCache.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.freepage;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
+
+/**
+ * In-memory buffer cache that supports two tree indexes.
+ * We assume that the tree indexes have 2 fixed pages, one at index 0 (metadata page), and one at index 1 (root page).
+ */
+public class DualIndexInMemoryBufferCache extends InMemoryBufferCache {
+
+ public DualIndexInMemoryBufferCache(ICacheMemoryAllocator allocator, int pageSize, int numPages) {
+ super(allocator, pageSize, numPages, new TransientFileMapManager());
+ }
+
+ @Override
+ public ICachedPage pin(long dpid, boolean newPage) {
+ int pageId = BufferedFileHandle.getPageId(dpid);
+ int fileId = BufferedFileHandle.getFileId(dpid);
+ if (pageId < pages.length) {
+ // Common case: Return regular page.
+ if (pageId == 0 || pageId == 1) {
+ return pages[pageId + 2 * fileId];
+ } else {
+ return pages[pageId];
+ }
+ } else {
+ // Rare case: Return overflow page, possibly expanding overflow
+ // array.
+ synchronized (overflowPages) {
+ int numNewPages = pageId - pages.length - overflowPages.size() + 1;
+ if (numNewPages > 0) {
+ ByteBuffer[] buffers = allocator.allocate(pageSize, numNewPages);
+ for (int i = 0; i < numNewPages; i++) {
+ CachedPage overflowPage = new CachedPage(pages.length + overflowPages.size(), buffers[i]);
+ overflowPages.add(overflowPage);
+ }
+ }
+ return overflowPages.get(pageId - pages.length);
+ }
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/DualIndexInMemoryFreePageManager.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/DualIndexInMemoryFreePageManager.java
new file mode 100644
index 0000000..7a2be7f
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/DualIndexInMemoryFreePageManager.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.freepage;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
+
+/**
+ * In-memory free page manager that supports two tree indexes.
+ * We assume that the tree indexes have 2 fixed pages, one at index 0 (metadata page), and one at index 1 (root page).
+ */
+public class DualIndexInMemoryFreePageManager extends InMemoryFreePageManager {
+
+ public DualIndexInMemoryFreePageManager(int capacity, ITreeIndexMetaDataFrameFactory metaDataFrameFactory) {
+ super(capacity, metaDataFrameFactory);
+ // We start the currentPageId from 3, because the RTree uses
+ // the first page as metadata page, and the second page as root page.
+ // And the BTree uses the third page as metadata, and the third page as root page
+ // (when returning free pages we first increment, then get)
+ currentPageId.set(3);
+ }
+
+ @Override
+ public void init(ITreeIndexMetaDataFrame metaFrame, int currentMaxPage) throws HyracksDataException {
+ currentPageId.set(3);
+ }
+
+ public int getCapacity() {
+ return capacity - 4;
+ }
+
+ public void reset() {
+ currentPageId.set(3);
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
new file mode 100644
index 0000000..66d8ec2
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.freepage;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPageInternal;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class InMemoryBufferCache implements IInMemoryBufferCache {
+ protected final ICacheMemoryAllocator allocator;
+ protected final IFileMapManager fileMapManager;
+ protected final int pageSize;
+ protected final int numPages;
+ protected final List<CachedPage> overflowPages = new ArrayList<CachedPage>();
+ protected CachedPage[] pages;
+
+ public InMemoryBufferCache(ICacheMemoryAllocator allocator, int pageSize, int numPages,
+ IFileMapManager fileMapManager) {
+ this.allocator = allocator;
+ this.fileMapManager = fileMapManager;
+ this.pageSize = pageSize;
+ this.numPages = numPages;
+ }
+
+ public void open() {
+ pages = new CachedPage[numPages];
+ ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
+ for (int i = 0; i < buffers.length; ++i) {
+ pages[i] = new CachedPage(i, buffers[i]);
+ }
+ }
+
+ @Override
+ public ICachedPage pin(long dpid, boolean newPage) {
+ int pageId = BufferedFileHandle.getPageId(dpid);
+ if (pageId < pages.length) {
+ // Common case: Return regular page.
+ return pages[pageId];
+ } else {
+ // Rare case: Return overflow page, possibly expanding overflow array.
+ synchronized (overflowPages) {
+ int numNewPages = pageId - pages.length - overflowPages.size() + 1;
+ if (numNewPages > 0) {
+ ByteBuffer[] buffers = allocator.allocate(pageSize, numNewPages);
+ for (int i = 0; i < numNewPages; i++) {
+ CachedPage overflowPage = new CachedPage(pages.length + overflowPages.size(), buffers[i]);
+ overflowPages.add(overflowPage);
+ }
+ }
+ return overflowPages.get(pageId - pages.length);
+ }
+ }
+ }
+
+ @Override
+ public ICachedPage tryPin(long dpid) throws HyracksDataException {
+ return pin(dpid, false);
+ }
+
+ @Override
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public int getNumPages() {
+ return numPages;
+ }
+
+ @Override
+ public ICachedPageInternal getPage(int cpid) {
+ return pages[cpid];
+ }
+
+ public int getNumOverflowPages() {
+ return overflowPages.size();
+ }
+
+ @Override
+ public void createFile(FileReference fileRef) throws HyracksDataException {
+ synchronized (fileMapManager) {
+ fileMapManager.registerFile(fileRef);
+ }
+ }
+
+ @Override
+ public void openFile(int fileId) throws HyracksDataException {
+ // Do nothing.
+ }
+
+ @Override
+ public void closeFile(int fileId) throws HyracksDataException {
+ // Do nothing.
+ }
+
+ @Override
+ public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException {
+ synchronized (fileMapManager) {
+ fileMapManager.unregisterFile(fileId);
+ }
+ }
+
+ @Override
+ public void unpin(ICachedPage page) throws HyracksDataException {
+ // Do Nothing.
+ }
+
+ @Override
+ public void close() {
+ for (int i = 0; i < numPages; ++i) {
+ pages[i] = null;
+ }
+ overflowPages.clear();
+ }
+
+ public class CachedPage implements ICachedPageInternal {
+ private final int cpid;
+ private final ByteBuffer buffer;
+ private final ReadWriteLock latch;
+
+ public CachedPage(int cpid, ByteBuffer buffer) {
+ this.cpid = cpid;
+ this.buffer = buffer;
+ latch = new ReentrantReadWriteLock(true);
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public Object getReplacementStrategyObject() {
+ // Do nothing.
+ return null;
+ }
+
+ @Override
+ public boolean pinIfGoodVictim() {
+ // Do nothing.
+ return false;
+ }
+
+ @Override
+ public int getCachedPageId() {
+ return cpid;
+ }
+
+ @Override
+ public void acquireReadLatch() {
+ latch.readLock().lock();
+ }
+
+ @Override
+ public void acquireWriteLatch() {
+ latch.writeLock().lock();
+ }
+
+ @Override
+ public void releaseReadLatch() {
+ latch.readLock().unlock();
+ }
+
+ @Override
+ public void releaseWriteLatch() {
+ latch.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void force(int fileId, boolean metadata) throws HyracksDataException {
+ }
+
+ @Override
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ }
+
+ public IFileMapProvider getFileMapProvider() {
+ return fileMapManager;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryFreePageManager.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryFreePageManager.java
new file mode 100644
index 0000000..c601a9b
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryFreePageManager.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.freepage;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
+
+public class InMemoryFreePageManager implements IInMemoryFreePageManager {
+ protected final int capacity;
+ protected final AtomicInteger currentPageId = new AtomicInteger();
+ protected final ITreeIndexMetaDataFrameFactory metaDataFrameFactory;
+
+ public InMemoryFreePageManager(int capacity, ITreeIndexMetaDataFrameFactory metaDataFrameFactory) {
+ // We start the currentPageId from 1, because the BTree uses
+ // the first page as metadata page, and the second page as root page.
+ // (when returning free pages we first increment, then get)
+ currentPageId.set(1);
+ this.capacity = capacity;
+ this.metaDataFrameFactory = metaDataFrameFactory;
+ }
+
+ @Override
+ public int getFreePage(ITreeIndexMetaDataFrame metaFrame) throws HyracksDataException {
+ // The very first call returns page id 2 because the BTree uses
+ // the first page as metadata page, and the second page as root page.
+ return currentPageId.incrementAndGet();
+ }
+
+ @Override
+ public int getMaxPage(ITreeIndexMetaDataFrame metaFrame) throws HyracksDataException {
+ return currentPageId.get();
+ }
+
+ @Override
+ public void init(ITreeIndexMetaDataFrame metaFrame, int currentMaxPage) throws HyracksDataException {
+ currentPageId.set(1);
+ }
+
+ @Override
+ public ITreeIndexMetaDataFrameFactory getMetaDataFrameFactory() {
+ return metaDataFrameFactory;
+ }
+
+ public int getCapacity() {
+ return capacity - 2;
+ }
+
+ public void reset() {
+ currentPageId.set(1);
+ }
+
+ public boolean isFull() {
+ return currentPageId.get() >= capacity;
+ }
+
+ @Override
+ public void addFreePage(ITreeIndexMetaDataFrame metaFrame, int freePage) throws HyracksDataException {
+ }
+
+ @Override
+ public byte getMetaPageLevelIndicator() {
+ return 0;
+ }
+
+ @Override
+ public byte getFreePageLevelIndicator() {
+ return 0;
+ }
+
+ @Override
+ public boolean isMetaPage(ITreeIndexMetaDataFrame metaFrame) {
+ return false;
+ }
+
+ @Override
+ public boolean isFreePage(ITreeIndexMetaDataFrame metaFrame) {
+ return false;
+ }
+
+ @Override
+ public int getFirstMetadataPage() {
+ // Method doesn't make sense for this free page manager.
+ return -1;
+ }
+
+ @Override
+ public void open(int fileId) {
+ // Method doesn't make sense for this free page manager.
+ }
+
+ @Override
+ public void close() {
+ // Method doesn't make sense for this free page manager.
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
new file mode 100644
index 0000000..b6fc2f7
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractImmutableLSMComponent.java
@@ -0,0 +1,69 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+
+public abstract class AbstractImmutableLSMComponent implements ILSMComponent {
+
+ private ComponentState state;
+ private int readerCount;
+
+ private enum ComponentState {
+ READABLE,
+ READABLE_MERGING,
+ KILLED
+ }
+
+ public AbstractImmutableLSMComponent() {
+ state = ComponentState.READABLE;
+ readerCount = 0;
+ }
+
+ @Override
+ public synchronized boolean threadEnter(LSMOperationType opType) {
+ if (state == ComponentState.KILLED) {
+ return false;
+ }
+
+ switch (opType) {
+ case MODIFICATION:
+ case SEARCH:
+ readerCount++;
+ break;
+ case MERGE:
+ if (state == ComponentState.READABLE_MERGING) {
+ return false;
+ }
+ state = ComponentState.READABLE_MERGING;
+ readerCount++;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported operation " + opType);
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException {
+ switch (opType) {
+ case MERGE:
+ if (failedOperation) {
+ state = ComponentState.READABLE;
+ }
+ case MODIFICATION:
+ case SEARCH:
+ readerCount--;
+
+ if (readerCount == 0 && state == ComponentState.READABLE_MERGING) {
+ destroy();
+ state = ComponentState.KILLED;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported operation " + opType);
+ }
+ }
+
+ protected abstract void destroy() throws HyracksDataException;
+
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
new file mode 100644
index 0000000..0c6b9ab
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public abstract class AbstractLSMIndex implements ILSMIndexInternal {
+ protected final static double MAX_BLOOM_FILTER_ACCEPTABLE_FALSE_POSITIVE_RATE = 0.1;
+
+ protected final ILSMHarness lsmHarness;
+
+ protected final ILSMIOOperationScheduler ioScheduler;
+ protected final ILSMIOOperationCallbackProvider ioOpCallbackProvider;
+
+ // In-memory components.
+ protected final IInMemoryFreePageManager memFreePageManager;
+
+ // On-disk components.
+ protected final IBufferCache diskBufferCache;
+ protected final ILSMIndexFileManager fileManager;
+ protected final IFileMapProvider diskFileMapProvider;
+ protected final AtomicReference<List<ILSMComponent>> componentsRef;
+
+ protected boolean isActivated;
+
+ private boolean needsFlush = false;
+
+ public AbstractLSMIndex(IInMemoryFreePageManager memFreePageManager, IBufferCache diskBufferCache,
+ ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider, ILSMMergePolicy mergePolicy,
+ ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallbackProvider ioOpCallbackProvider) {
+ this.memFreePageManager = memFreePageManager;
+ this.diskBufferCache = diskBufferCache;
+ this.diskFileMapProvider = diskFileMapProvider;
+ this.fileManager = fileManager;
+ this.ioScheduler = ioScheduler;
+ this.ioOpCallbackProvider = ioOpCallbackProvider;
+ ILSMOperationTracker opTracker = opTrackerFactory.createOperationTracker(this);
+ lsmHarness = new LSMHarness(this, mergePolicy, opTracker);
+ isActivated = false;
+ componentsRef = new AtomicReference<List<ILSMComponent>>();
+ componentsRef.set(new LinkedList<ILSMComponent>());
+ }
+
+ protected void forceFlushDirtyPages(ITreeIndex treeIndex) throws HyracksDataException {
+ int fileId = treeIndex.getFileId();
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ // Flush all dirty pages of the tree.
+ // By default, metadata and data are flushed asynchronously in the buffercache.
+ // This means that the flush issues writes to the OS, but the data may still lie in filesystem buffers.
+ ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+ int startPage = 0;
+ int maxPage = treeIndex.getFreePageManager().getMaxPage(metadataFrame);
+ forceFlushDirtyPages(bufferCache, fileId, startPage, maxPage);
+ }
+
+ protected void forceFlushDirtyPages(IBufferCache bufferCache, int fileId, int startPageId, int endPageId)
+ throws HyracksDataException {
+ for (int i = startPageId; i <= endPageId; i++) {
+ ICachedPage page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, i));
+ // If tryPin returns null, it means the page is not cached, and therefore cannot be dirty.
+ if (page == null) {
+ continue;
+ }
+ try {
+ bufferCache.flushDirtyPage(page);
+ } finally {
+ bufferCache.unpin(page);
+ }
+ }
+ // Forces all pages of given file to disk. This guarantees the data makes it to disk.
+ bufferCache.force(fileId, true);
+ }
+
+ protected void markAsValidInternal(ITreeIndex treeIndex) throws HyracksDataException {
+ int fileId = treeIndex.getFileId();
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+ // Mark the component as a valid component by flushing the metadata page to disk
+ int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
+ ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
+ metadataPage.acquireWriteLatch();
+ try {
+ metadataFrame.setPage(metadataPage);
+ metadataFrame.setValid(true);
+
+ // Flush the single modified page to disk.
+ bufferCache.flushDirtyPage(metadataPage);
+
+ // Force modified metadata page to disk.
+ bufferCache.force(fileId, true);
+ } finally {
+ metadataPage.releaseWriteLatch();
+ bufferCache.unpin(metadataPage);
+ }
+ }
+
+ @Override
+ public void addComponent(ILSMComponent c) {
+ List<ILSMComponent> oldList = componentsRef.get();
+ List<ILSMComponent> newList = new ArrayList<ILSMComponent>();
+ newList.add(c);
+ for (ILSMComponent oc : oldList) {
+ newList.add(oc);
+ }
+ componentsRef.set(newList);
+ }
+
+ @Override
+ public void subsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents) {
+ List<ILSMComponent> oldList = componentsRef.get();
+ List<ILSMComponent> newList = new ArrayList<ILSMComponent>();
+ int swapIndex = oldList.indexOf(mergedComponents.get(0));
+ int swapSize = mergedComponents.size();
+ for (int i = 0; i < oldList.size(); i++) {
+ if (i < swapIndex || i >= swapIndex + swapSize) {
+ newList.add(oldList.get(i));
+ } else if (i == swapIndex) {
+ newList.add(newComponent);
+ }
+ }
+ componentsRef.set(newList);
+ }
+
+ @Override
+ public IInMemoryFreePageManager getInMemoryFreePageManager() {
+ return memFreePageManager;
+ }
+
+ @Override
+ public List<ILSMComponent> getImmutableComponents() {
+ return componentsRef.get();
+ }
+
+ @Override
+ public void setFlushStatus(ILSMIndex index, boolean needsFlush) {
+ this.needsFlush = needsFlush;
+ }
+
+ @Override
+ public boolean getFlushStatus(ILSMIndex index) {
+ return needsFlush;
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ return lsmHarness.getOperationTracker();
+ }
+
+ @Override
+ public ILSMIOOperationScheduler getIOScheduler() {
+ return ioScheduler;
+ }
+
+ @Override
+ public IBufferCache getBufferCache() {
+ return diskBufferCache;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
new file mode 100644
index 0000000..a84f8c9
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IODeviceHandle;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManager {
+
+ protected static final String SPLIT_STRING = "_";
+ protected static final String BLOOM_FILTER_STRING = "f";
+
+ // Use all IODevices registered in ioManager in a round-robin fashion to choose
+ // where to flush and merge
+ protected final IIOManager ioManager;
+ protected final IFileMapProvider fileMapProvider;
+
+ // baseDir should reflect dataset name and partition name.
+ protected String baseDir;
+ protected final Format formatter = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
+ protected final Comparator<String> cmp = new FileNameComparator();
+ protected final Comparator<ComparableFileName> recencyCmp = new RecencyComparator();
+
+ protected final TreeIndexFactory<? extends ITreeIndex> treeFactory;
+
+ // The current index for the round-robin file assignment
+ private int ioDeviceIndex = 0;
+
+ public AbstractLSMIndexFileManager(IIOManager ioManager, IFileMapProvider fileMapProvider, FileReference file,
+ TreeIndexFactory<? extends ITreeIndex> treeFactory, int startIODeviceIndex) {
+ this.baseDir = file.getFile().getPath();
+ if (!baseDir.endsWith(System.getProperty("file.separator"))) {
+ baseDir += System.getProperty("file.separator");
+ }
+ this.fileMapProvider = fileMapProvider;
+ this.ioManager = ioManager;
+ this.treeFactory = treeFactory;
+ ioDeviceIndex = startIODeviceIndex % ioManager.getIODevices().size();
+ }
+
+ private static FilenameFilter fileNameFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".");
+ }
+ };
+
+ protected boolean isValidTreeIndex(ITreeIndex treeIndex) throws HyracksDataException {
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ treeIndex.activate();
+ try {
+ int metadataPage = treeIndex.getFreePageManager().getFirstMetadataPage();
+ ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory()
+ .createFrame();
+ ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(treeIndex.getFileId(), metadataPage),
+ false);
+ page.acquireReadLatch();
+ try {
+ metadataFrame.setPage(page);
+ return metadataFrame.isValid();
+ } finally {
+ page.releaseReadLatch();
+ bufferCache.unpin(page);
+ }
+ } finally {
+ treeIndex.deactivate();
+ }
+ }
+
+ protected void cleanupAndGetValidFilesInternal(IODeviceHandle dev, FilenameFilter filter,
+ TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles)
+ throws HyracksDataException, IndexException {
+ File dir = new File(dev.getPath(), baseDir);
+ String[] files = dir.list(filter);
+ for (String fileName : files) {
+ File file = new File(dir.getPath() + File.separator + fileName);
+ FileReference fileRef = new FileReference(file);
+ if (treeFactory == null || isValidTreeIndex(treeFactory.createIndexInstance(fileRef))) {
+ allFiles.add(new ComparableFileName(fileRef));
+ } else {
+ file.delete();
+ }
+ }
+ }
+
+ @Override
+ public void createDirs() {
+ for (IODeviceHandle dev : ioManager.getIODevices()) {
+ File f = new File(dev.getPath(), baseDir);
+ f.mkdirs();
+ }
+ }
+
+ @Override
+ public void deleteDirs() {
+ for (IODeviceHandle dev : ioManager.getIODevices()) {
+ File f = new File(dev.getPath(), baseDir);
+ delete(f);
+ }
+ }
+
+ private void delete(File f) {
+ if (f.isDirectory()) {
+ for (File c : f.listFiles()) {
+ delete(c);
+ }
+ }
+ f.delete();
+ }
+
+ protected static FilenameFilter bloomFilterFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".") && name.endsWith(BLOOM_FILTER_STRING);
+ }
+ };
+
+ protected FileReference createFlushFile(String relFlushFileName) {
+ // Assigns new files to I/O devices in round-robin fashion.
+ IODeviceHandle dev = ioManager.getIODevices().get(ioDeviceIndex);
+ ioDeviceIndex = (ioDeviceIndex + 1) % ioManager.getIODevices().size();
+ return dev.createFileReference(relFlushFileName);
+ }
+
+ protected FileReference createMergeFile(String relMergeFileName) {
+ return createFlushFile(relMergeFileName);
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelFlushFileReference() {
+ Date date = new Date();
+ String ts = formatter.format(date);
+ // Begin timestamp and end timestamp are identical since it is a flush
+ return new LSMComponentFileReferences(createFlushFile(baseDir + ts + SPLIT_STRING + ts), null, null);
+ }
+
+ @Override
+ public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName)
+ throws HyracksDataException {
+ String[] firstTimestampRange = firstFileName.split(SPLIT_STRING);
+ String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
+ // Get the range of timestamps by taking the earliest and the latest timestamps
+ return new LSMComponentFileReferences(createMergeFile(baseDir + firstTimestampRange[0] + SPLIT_STRING
+ + lastTimestampRange[1]), null, null);
+ }
+
+ @Override
+ public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException, IndexException {
+ List<LSMComponentFileReferences> validFiles = new ArrayList<LSMComponentFileReferences>();
+ ArrayList<ComparableFileName> allFiles = new ArrayList<ComparableFileName>();
+
+ // Gather files from all IODeviceHandles and delete invalid files
+ // There are two types of invalid files:
+ // (1) The isValid flag is not set
+ // (2) The file's interval is contained by some other file
+ // Here, we only filter out (1).
+ for (IODeviceHandle dev : ioManager.getIODevices()) {
+ cleanupAndGetValidFilesInternal(dev, fileNameFilter, treeFactory, allFiles);
+ }
+
+ if (allFiles.isEmpty()) {
+ return validFiles;
+ }
+
+ if (allFiles.size() == 1) {
+ validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null));
+ return validFiles;
+ }
+
+ // Sorts files names from earliest to latest timestamp.
+ Collections.sort(allFiles);
+
+ List<ComparableFileName> validComparableFiles = new ArrayList<ComparableFileName>();
+ ComparableFileName last = allFiles.get(0);
+ validComparableFiles.add(last);
+ for (int i = 1; i < allFiles.size(); i++) {
+ ComparableFileName current = allFiles.get(i);
+ // The current start timestamp is greater than last stop timestamp so current is valid.
+ if (current.interval[0].compareTo(last.interval[1]) > 0) {
+ validComparableFiles.add(current);
+ last = current;
+ } else if (current.interval[0].compareTo(last.interval[0]) >= 0
+ && current.interval[1].compareTo(last.interval[1]) <= 0) {
+ // The current file is completely contained in the interval of the
+ // last file. Thus the last file must contain at least as much information
+ // as the current file, so delete the current file.
+ current.fileRef.delete();
+ } else {
+ // This scenario should not be possible since timestamps are monotonically increasing.
+ throw new HyracksDataException("Found LSM files with overlapping timestamp intervals, "
+ + "but the intervals were not contained by another file.");
+ }
+ }
+
+ // Sort valid files in reverse lexicographical order, such that newer files come first.
+ Collections.sort(validComparableFiles, recencyCmp);
+ for (ComparableFileName cmpFileName : validComparableFiles) {
+ validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null));
+ }
+
+ return validFiles;
+ }
+
+ @Override
+ public Comparator<String> getFileNameComparator() {
+ return cmp;
+ }
+
+ /**
+ * Sorts strings in reverse lexicographical order. The way we construct the
+ * file names above guarantees that:
+ * 1. Flushed files sort lower than merged files
+ * 2. Flushed files are sorted from newest to oldest (based on the timestamp
+ * string)
+ */
+ private class FileNameComparator implements Comparator<String> {
+ @Override
+ public int compare(String a, String b) {
+ // Consciously ignoring locale.
+ return -a.compareTo(b);
+ }
+ }
+
+ @Override
+ public String getBaseDir() {
+ return baseDir;
+ }
+
+ protected class ComparableFileName implements Comparable<ComparableFileName> {
+ public final FileReference fileRef;
+ public final String fullPath;
+ public final String fileName;
+
+ // Timestamp interval.
+ public final String[] interval;
+
+ public ComparableFileName(FileReference fileRef) {
+ this.fileRef = fileRef;
+ this.fullPath = fileRef.getFile().getAbsolutePath();
+ this.fileName = fileRef.getFile().getName();
+ interval = fileName.split(SPLIT_STRING);
+ }
+
+ @Override
+ public int compareTo(ComparableFileName b) {
+ int startCmp = interval[0].compareTo(b.interval[0]);
+ if (startCmp != 0) {
+ return startCmp;
+ }
+ return b.interval[1].compareTo(interval[1]);
+ }
+ }
+
+ private class RecencyComparator implements Comparator<ComparableFileName> {
+ @Override
+ public int compare(ComparableFileName a, ComparableFileName b) {
+ int cmp = -a.interval[0].compareTo(b.interval[0]);
+ if (cmp != 0) {
+ return cmp;
+ }
+ return -a.interval[1].compareTo(b.interval[1]);
+ }
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
new file mode 100644
index 0000000..1a6636a
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractMutableLSMComponent.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+
+public abstract class AbstractMutableLSMComponent implements ILSMComponent {
+
+ private int readerCount;
+ private int writerCount;
+ private ComponentState state;
+
+ private enum ComponentState {
+ READABLE_WRITABLE,
+ READABLE_UNWRITABLE,
+ READABLE_UNWRITABLE_FLUSHING,
+ UNREADABLE_UNWRITABLE
+ }
+
+ public AbstractMutableLSMComponent() {
+ readerCount = 0;
+ writerCount = 0;
+ state = ComponentState.READABLE_WRITABLE;
+ }
+
+ @Override
+ public synchronized boolean threadEnter(LSMOperationType opType) throws InterruptedException {
+ switch (opType) {
+ case FORCE_MODIFICATION:
+ if (state != ComponentState.READABLE_WRITABLE && state != ComponentState.READABLE_UNWRITABLE) {
+ return false;
+ }
+ writerCount++;
+ break;
+ case MODIFICATION:
+ if (state != ComponentState.READABLE_WRITABLE) {
+ return false;
+ }
+ writerCount++;
+ break;
+ case SEARCH:
+ if (state == ComponentState.UNREADABLE_UNWRITABLE) {
+ return false;
+ }
+ readerCount++;
+ break;
+ case FLUSH:
+ if (state == ComponentState.READABLE_UNWRITABLE_FLUSHING
+ || state == ComponentState.UNREADABLE_UNWRITABLE) {
+ return false;
+ }
+
+ state = ComponentState.READABLE_UNWRITABLE_FLUSHING;
+ while (writerCount > 0) {
+ wait();
+ }
+ readerCount++;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported operation " + opType);
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException {
+ switch (opType) {
+ case FORCE_MODIFICATION:
+ case MODIFICATION:
+ writerCount--;
+ if (state == ComponentState.READABLE_WRITABLE && isFull()) {
+ state = ComponentState.READABLE_UNWRITABLE;
+ }
+ break;
+ case SEARCH:
+ readerCount--;
+ if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
+ reset();
+ state = ComponentState.READABLE_WRITABLE;
+ } else if (state == ComponentState.READABLE_WRITABLE && isFull()) {
+ state = ComponentState.READABLE_UNWRITABLE;
+ }
+ break;
+ case FLUSH:
+ if (failedOperation) {
+ state = isFull() ? ComponentState.READABLE_UNWRITABLE : ComponentState.READABLE_WRITABLE;
+ }
+ readerCount--;
+ if (readerCount == 0) {
+ reset();
+ state = ComponentState.READABLE_WRITABLE;
+ } else if (state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
+ state = ComponentState.UNREADABLE_UNWRITABLE;
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported operation " + opType);
+ }
+ notifyAll();
+ }
+
+ protected abstract boolean isFull();
+
+ protected abstract void reset() throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BTreeFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BTreeFactory.java
new file mode 100644
index 0000000..008c418
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BTreeFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManagerFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class BTreeFactory extends TreeIndexFactory<BTree> {
+
+ public BTreeFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+ IFreePageManagerFactory freePageManagerFactory, ITreeIndexFrameFactory interiorFrameFactory,
+ ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount) {
+ super(bufferCache, fileMapProvider, freePageManagerFactory, interiorFrameFactory, leafFrameFactory,
+ cmpFactories, fieldCount);
+ }
+
+ @Override
+ public BTree createIndexInstance(FileReference file) throws IndexException {
+ return new BTree(bufferCache, fileMapProvider, freePageManagerFactory.createFreePageManager(),
+ interiorFrameFactory, leafFrameFactory, cmpFactories, fieldCount, file);
+ }
+
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
new file mode 100644
index 0000000..34e1f0d
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java
@@ -0,0 +1,43 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+
+public class BlockingIOOperationCallbackWrapper implements ILSMIOOperationCallback {
+
+ private boolean notified = false;
+
+ private final ILSMIOOperationCallback wrappedCallback;
+
+ public BlockingIOOperationCallbackWrapper(ILSMIOOperationCallback callback) {
+ this.wrappedCallback = callback;
+ }
+
+ public synchronized void waitForIO() throws InterruptedException {
+ if (!notified) {
+ this.wait();
+ }
+ notified = false;
+ }
+
+ @Override
+ public void beforeOperation() throws HyracksDataException {
+ wrappedCallback.beforeOperation();
+ }
+
+ @Override
+ public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ throws HyracksDataException {
+ wrappedCallback.afterOperation(oldComponents, newComponent);
+ }
+
+ @Override
+ public synchronized void afterFinalize(ILSMComponent newComponent) throws HyracksDataException {
+ wrappedCallback.afterFinalize(newComponent);
+ this.notifyAll();
+ notified = true;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BloomFilterAwareBTreePointSearchCursor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BloomFilterAwareBTreePointSearchCursor.java
new file mode 100644
index 0000000..af08bdb
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BloomFilterAwareBTreePointSearchCursor.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.BloomFilter;
+import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor;
+
+public class BloomFilterAwareBTreePointSearchCursor extends BTreeRangeSearchCursor {
+ private final BloomFilter bloomFilter;
+ private long[] hashes = new long[2];
+
+ public BloomFilterAwareBTreePointSearchCursor(IBTreeLeafFrame frame, boolean exclusiveLatchNodes,
+ BloomFilter bloomFilter) {
+ super(frame, exclusiveLatchNodes);
+ this.bloomFilter = bloomFilter;
+ }
+
+ @Override
+ public boolean hasNext() throws HyracksDataException {
+ if (bloomFilter.contains(lowKey, hashes)) {
+ return super.hasNext();
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
new file mode 100644
index 0000000..1c72abf
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+
+public class ConstantMergePolicy implements ILSMMergePolicy {
+
+ private final int threshold;
+
+ public ConstantMergePolicy(int threshold) {
+ this.threshold = threshold;
+ }
+
+ @Override
+ public void diskComponentAdded(final ILSMIndex index, int totalNumDiskComponents) throws HyracksDataException,
+ IndexException {
+ if (totalNumDiskComponents >= threshold) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+ }
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
new file mode 100644
index 0000000..b404c9b
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ConstantMergePolicyProvider.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+
+public class ConstantMergePolicyProvider implements ILSMMergePolicyProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int threshold;
+
+ public ConstantMergePolicyProvider(int threshold) {
+ this.threshold = threshold;
+ }
+
+ @Override
+ public ILSMMergePolicy getMergePolicy(IHyracksTaskContext ctx) {
+ return new ConstantMergePolicy(threshold);
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/IndexFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/IndexFactory.java
new file mode 100644
index 0000000..3feaecf
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/IndexFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManagerFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public abstract class IndexFactory<T extends IIndex> {
+
+ protected final IBufferCache bufferCache;
+ protected final IFileMapProvider fileMapProvider;
+ protected final IFreePageManagerFactory freePageManagerFactory;
+
+ public IndexFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+ IFreePageManagerFactory freePageManagerFactory) {
+ this.bufferCache = bufferCache;
+ this.fileMapProvider = fileMapProvider;
+ this.freePageManagerFactory = freePageManagerFactory;
+ }
+
+ public abstract T createIndexInstance(FileReference file) throws IndexException;
+
+ public IBufferCache getBufferCache() {
+ return bufferCache;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
new file mode 100644
index 0000000..019dca4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFileReferences.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.io.FileReference;
+
+public final class LSMComponentFileReferences {
+
+ // The FileReference for the index that is used for inserting records of the component. For instance, this will be the FileReference of the RTree in one component of the LSM-RTree.
+ private final FileReference insertIndexFileReference;
+ // This FileReference for the delete index (if any). For example, this will be the the FileReference of the buddy BTree in one component of the LSM-RTree.
+ private final FileReference deleteIndexFileReference;
+
+ // This FileReference for the bloom filter (if any).
+ private final FileReference bloomFilterFileReference;
+
+ public LSMComponentFileReferences(FileReference insertIndexFileReference, FileReference deleteIndexFileReference,
+ FileReference bloomFilterFileReference) {
+ this.insertIndexFileReference = insertIndexFileReference;
+ this.deleteIndexFileReference = deleteIndexFileReference;
+ this.bloomFilterFileReference = bloomFilterFileReference;
+ }
+
+ public FileReference getInsertIndexFileReference() {
+ return insertIndexFileReference;
+ }
+
+ public FileReference getDeleteIndexFileReference() {
+ return deleteIndexFileReference;
+ }
+
+ public FileReference getBloomFilterFileReference() {
+ return bloomFilterFileReference;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentState.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentState.java
new file mode 100644
index 0000000..e554a6e
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentState.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+public enum LSMComponentState {
+ FLUSHING,
+ MERGING,
+ DONE_FLUSHING,
+ DONE_MERGING
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
new file mode 100644
index 0000000..4a140b4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class LSMHarness implements ILSMHarness {
+ private final ILSMIndexInternal lsmIndex;
+ private final ILSMMergePolicy mergePolicy;
+ private final ILSMOperationTracker opTracker;
+
+ public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker) {
+ this.lsmIndex = lsmIndex;
+ this.opTracker = opTracker;
+ this.mergePolicy = mergePolicy;
+ }
+
+ private void threadExit(ILSMIndexOperationContext opCtx, LSMOperationType opType) throws HyracksDataException {
+ if (!lsmIndex.getFlushStatus(lsmIndex) && lsmIndex.getInMemoryFreePageManager().isFull()) {
+ lsmIndex.setFlushStatus(lsmIndex, true);
+ }
+ opTracker.afterOperation(opType, opCtx.getSearchOperationCallback(), opCtx.getModificationCallback());
+ }
+
+ private boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean tryOperation)
+ throws HyracksDataException {
+ int numEntered = 0;
+ boolean entranceSuccessful = false;
+ List<ILSMComponent> entered = new ArrayList<ILSMComponent>();
+
+ while (!entranceSuccessful) {
+ entered.clear();
+ lsmIndex.getOperationalComponents(ctx);
+ List<ILSMComponent> components = ctx.getComponentHolder();
+ try {
+ for (ILSMComponent c : components) {
+ if (!c.threadEnter(opType)) {
+ break;
+ }
+ numEntered++;
+ entered.add(c);
+ }
+ entranceSuccessful = numEntered == components.size();
+ } catch (InterruptedException e) {
+ entranceSuccessful = false;
+ throw new HyracksDataException(e);
+ } finally {
+ if (!entranceSuccessful) {
+ for (ILSMComponent c : components) {
+ if (numEntered <= 0) {
+ break;
+ }
+ c.threadExit(opType, true);
+ numEntered--;
+ }
+ }
+ }
+ if (tryOperation && !entranceSuccessful) {
+ return false;
+ }
+ }
+
+ opTracker.beforeOperation(opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
+ return true;
+ }
+
+ private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, boolean failedOperation)
+ throws HyracksDataException {
+ try {
+ for (ILSMComponent c : ctx.getComponentHolder()) {
+ c.threadExit(opType, failedOperation);
+ }
+ } finally {
+ threadExit(ctx, opType);
+ }
+ }
+
+ @Override
+ public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
+ IndexException {
+ LSMOperationType opType = LSMOperationType.FORCE_MODIFICATION;
+ modify(ctx, false, tuple, opType);
+ }
+
+ @Override
+ public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
+ throws HyracksDataException, IndexException {
+ LSMOperationType opType = LSMOperationType.MODIFICATION;
+ return modify(ctx, tryOperation, tuple, opType);
+ }
+
+ private boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple,
+ LSMOperationType opType) throws HyracksDataException, IndexException {
+ if (!getAndEnterComponents(ctx, opType, tryOperation)) {
+ return false;
+ }
+ try {
+ lsmIndex.modify(ctx, tuple);
+ } finally {
+ exitComponents(ctx, opType, false);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException {
+ LSMOperationType opType = LSMOperationType.SEARCH;
+ getAndEnterComponents(ctx, opType, false);
+ try {
+ lsmIndex.search(ctx, cursor, pred);
+ } catch (HyracksDataException e) {
+ exitComponents(ctx, opType, true);
+ throw e;
+ } catch (IndexException e) {
+ exitComponents(ctx, opType, true);
+ throw e;
+ }
+ }
+
+ @Override
+ public void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ if (ctx.getOperation() == IndexOperation.SEARCH) {
+ exitComponents(ctx, LSMOperationType.SEARCH, false);
+ }
+ }
+
+ @Override
+ public void noOp(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ LSMOperationType opType = LSMOperationType.NOOP;
+ opTracker.beforeOperation(opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
+ threadExit(ctx, opType);
+ }
+
+ @Override
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
+ return;
+ }
+ lsmIndex.setFlushStatus(lsmIndex, false);
+ lsmIndex.scheduleFlush(ctx, callback);
+ }
+
+ @Override
+ public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException {
+ operation.getCallback().beforeOperation();
+ ILSMComponent newComponent = lsmIndex.flush(operation);
+ operation.getCallback().afterOperation(null, newComponent);
+ lsmIndex.markAsValid(newComponent);
+ operation.getCallback().afterFinalize(newComponent);
+
+ lsmIndex.addComponent(newComponent);
+ int numComponents = lsmIndex.getImmutableComponents().size();
+
+ mergePolicy.diskComponentAdded(lsmIndex, numComponents);
+ exitComponents(ctx, LSMOperationType.FLUSH, false);
+ }
+
+ @Override
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ LSMOperationType opType = LSMOperationType.MERGE;
+ if (!getAndEnterComponents(ctx, opType, false)) {
+ return;
+ }
+ if (ctx.getComponentHolder().size() > 1) {
+ lsmIndex.scheduleMerge(ctx, callback);
+ } else {
+ exitComponents(ctx, opType, true);
+ }
+ }
+
+ @Override
+ public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException {
+ List<ILSMComponent> mergedComponents = new ArrayList<ILSMComponent>();
+ operation.getCallback().beforeOperation();
+ ILSMComponent newComponent = lsmIndex.merge(mergedComponents, operation);
+ ctx.getComponentHolder().addAll(mergedComponents);
+ operation.getCallback().afterOperation(mergedComponents, newComponent);
+ lsmIndex.markAsValid(newComponent);
+ operation.getCallback().afterFinalize(newComponent);
+ lsmIndex.subsumeMergedComponents(newComponent, mergedComponents);
+ exitComponents(ctx, LSMOperationType.MERGE, false);
+ }
+
+ @Override
+ public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
+ lsmIndex.markAsValid(c);
+ lsmIndex.addComponent(c);
+ int numComponents = lsmIndex.getImmutableComponents().size();
+ mergePolicy.diskComponentAdded(lsmIndex, numComponents);
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ return opTracker;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
new file mode 100644
index 0000000..7f08ba4
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public abstract class LSMIndexSearchCursor implements ITreeIndexCursor {
+ protected PriorityQueueElement outputElement;
+ protected IIndexCursor[] rangeCursors;
+ protected PriorityQueue<PriorityQueueElement> outputPriorityQueue;
+ protected PriorityQueueComparator pqCmp;
+ protected MultiComparator cmp;
+ protected boolean needPush;
+ protected boolean includeMemComponent;
+ protected ILSMHarness lsmHarness;
+ protected final ILSMIndexOperationContext opCtx;
+
+ protected List<ILSMComponent> operationalComponents;
+
+ public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx) {
+ this.opCtx = opCtx;
+ outputElement = null;
+ needPush = false;
+ }
+
+ public void initPriorityQueue() throws HyracksDataException, IndexException {
+ int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
+ outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(pqInitSize, pqCmp);
+ for (int i = 0; i < rangeCursors.length; i++) {
+ pushIntoPriorityQueue(new PriorityQueueElement(i));
+ }
+ }
+
+ public IIndexCursor getCursor(int cursorIndex) {
+ return rangeCursors[cursorIndex];
+ }
+
+ @Override
+ public void reset() throws HyracksDataException, IndexException {
+ outputElement = null;
+ needPush = false;
+
+ if (outputPriorityQueue != null) {
+ outputPriorityQueue.clear();
+ }
+
+ if (rangeCursors != null) {
+ for (int i = 0; i < rangeCursors.length; i++) {
+ rangeCursors[i].reset();
+ }
+ }
+ rangeCursors = null;
+ }
+
+ @Override
+ public boolean hasNext() throws HyracksDataException, IndexException {
+ checkPriorityQueue();
+ return !outputPriorityQueue.isEmpty();
+ }
+
+ @Override
+ public void next() throws HyracksDataException {
+ outputElement = outputPriorityQueue.poll();
+ needPush = true;
+ }
+
+ @Override
+ public ICachedPage getPage() {
+ // do nothing
+ return null;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (lsmHarness != null) {
+ try {
+ outputPriorityQueue.clear();
+ for (int i = 0; i < rangeCursors.length; i++) {
+ rangeCursors[i].close();
+ }
+ rangeCursors = null;
+ } finally {
+ lsmHarness.endSearch(opCtx);
+ }
+ }
+ }
+
+ @Override
+ public void setBufferCache(IBufferCache bufferCache) {
+ // do nothing
+ }
+
+ @Override
+ public void setFileId(int fileId) {
+ // do nothing
+ }
+
+ @Override
+ public ITupleReference getTuple() {
+ return outputElement.getTuple();
+ }
+
+ protected boolean pushIntoPriorityQueue(PriorityQueueElement e) throws HyracksDataException, IndexException {
+ int cursorIndex = e.getCursorIndex();
+ if (rangeCursors[cursorIndex].hasNext()) {
+ rangeCursors[cursorIndex].next();
+ e.reset(rangeCursors[cursorIndex].getTuple());
+ outputPriorityQueue.offer(e);
+ return true;
+ }
+ rangeCursors[cursorIndex].close();
+ return false;
+ }
+
+ protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+ return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
+ }
+
+ abstract protected void checkPriorityQueue() throws HyracksDataException, IndexException;
+
+ @Override
+ public boolean exclusiveLatchNodes() {
+ return false;
+ }
+
+ public class PriorityQueueElement {
+ private ITupleReference tuple;
+ private final int cursorIndex;
+
+ public PriorityQueueElement(int cursorIndex) {
+ tuple = null;
+ this.cursorIndex = cursorIndex;
+ }
+
+ public ITupleReference getTuple() {
+ return tuple;
+ }
+
+ public int getCursorIndex() {
+ return cursorIndex;
+ }
+
+ public void reset(ITupleReference tuple) {
+ this.tuple = tuple;
+ }
+ }
+
+ public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
+
+ protected final MultiComparator cmp;
+
+ public PriorityQueueComparator(MultiComparator cmp) {
+ this.cmp = cmp;
+ }
+
+ @Override
+ public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
+ int result = cmp.compare(elementA.getTuple(), elementB.getTuple());
+ if (result != 0) {
+ return result;
+ }
+ if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+
+ public MultiComparator getMultiComparator() {
+ return cmp;
+ }
+ }
+
+ protected void setPriorityQueueComparator() {
+ if (pqCmp == null || cmp != pqCmp.getMultiComparator()) {
+ pqCmp = new PriorityQueueComparator(cmp);
+ }
+ }
+
+ protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB) {
+ return cmp.compare(tupleA, tupleB);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java
new file mode 100644
index 0000000..981cefe
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMOperationType.java
@@ -0,0 +1,10 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+public enum LSMOperationType {
+ SEARCH,
+ MODIFICATION,
+ FORCE_MODIFICATION,
+ FLUSH,
+ MERGE,
+ NOOP
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
new file mode 100644
index 0000000..7cc29a5
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+
+public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessorInternal {
+ protected ILSMHarness lsmHarness;
+ protected ILSMIndexOperationContext ctx;
+
+ public LSMTreeIndexAccessor(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) {
+ this.lsmHarness = lsmHarness;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void insert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.INSERT);
+ lsmHarness.modify(ctx, false, tuple);
+ }
+
+ @Override
+ public void update(ITupleReference tuple) throws HyracksDataException, IndexException {
+ // Update is the same as insert.
+ ctx.setOperation(IndexOperation.UPDATE);
+ lsmHarness.modify(ctx, false, tuple);
+ }
+
+ @Override
+ public void delete(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.DELETE);
+ lsmHarness.modify(ctx, false, tuple);
+ }
+
+ @Override
+ public void upsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.UPSERT);
+ lsmHarness.modify(ctx, false, tuple);
+ }
+
+ @Override
+ public boolean tryInsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.INSERT);
+ return lsmHarness.modify(ctx, true, tuple);
+ }
+
+ @Override
+ public boolean tryDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.DELETE);
+ return lsmHarness.modify(ctx, true, tuple);
+ }
+
+ @Override
+ public boolean tryUpdate(ITupleReference tuple) throws HyracksDataException, IndexException {
+ // Update is the same as insert.
+ ctx.setOperation(IndexOperation.UPDATE);
+ return lsmHarness.modify(ctx, true, tuple);
+ }
+
+ @Override
+ public boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.UPSERT);
+ return lsmHarness.modify(ctx, true, tuple);
+ }
+
+ @Override
+ public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.SEARCH);
+ lsmHarness.search(ctx, cursor, searchPred);
+ }
+
+ @Override
+ public void flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+ lsmHarness.flush(ctx, operation);
+ }
+
+ @Override
+ public void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.MERGE);
+ lsmHarness.merge(ctx, operation);
+ }
+
+ @Override
+ public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.PHYSICALDELETE);
+ lsmHarness.modify(ctx, false, tuple);
+ }
+
+ @Override
+ public void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException {
+ ctx.setOperation(IndexOperation.FLUSH);
+ lsmHarness.scheduleFlush(ctx, callback);
+ }
+
+ @Override
+ public void scheduleMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.MERGE);
+ lsmHarness.scheduleMerge(ctx, callback);
+ }
+
+ @Override
+ public void noOp() throws HyracksDataException {
+ lsmHarness.noOp(ctx);
+ }
+
+ @Override
+ public void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.PHYSICALDELETE);
+ lsmHarness.forceModify(ctx, tuple);
+ }
+
+ @Override
+ public void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.INSERT);
+ lsmHarness.forceModify(ctx, tuple);
+ }
+
+ @Override
+ public void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.DELETE);
+ lsmHarness.forceModify(ctx, tuple);
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
new file mode 100644
index 0000000..5d36c09
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -0,0 +1,14 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+
+public enum NoMergePolicy implements ILSMMergePolicy {
+ INSTANCE;
+
+ @Override
+ public void diskComponentAdded(ILSMIndex index, int totalNumDiskComponents) {
+ // Do nothing
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java
new file mode 100644
index 0000000..b123b30
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java
@@ -0,0 +1,34 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public enum NoOpIOOperationCallback implements ILSMIOOperationCallback, ILSMIOOperationCallbackProvider {
+ INSTANCE;
+
+ @Override
+ public void beforeOperation() throws HyracksDataException {
+ // Do nothing.
+ }
+
+ @Override
+ public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+ throws HyracksDataException {
+ // Do nothing.
+ }
+
+ @Override
+ public void afterFinalize(ILSMComponent newComponent) throws HyracksDataException {
+ // Do nothing.
+ }
+
+ @Override
+ public ILSMIOOperationCallback getIOOperationCallback(ILSMIndex index) {
+ return INSTANCE;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
new file mode 100644
index 0000000..97ec50e
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpOperationTrackerFactory.java
@@ -0,0 +1,48 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+
+/**
+ * Operation tracker that does nothing.
+ * WARNING: This op tracker should only be used for specific testing purposes.
+ * It is assumed than an op tracker cooperates with an lsm index to synchronize flushes with
+ * regular operations, and this implementation does no such tracking at all.
+ */
+public class NoOpOperationTrackerFactory implements ILSMOperationTrackerFactory {
+ private static final long serialVersionUID = 1L;
+
+ public static NoOpOperationTrackerFactory INSTANCE = new NoOpOperationTrackerFactory();
+
+ @Override
+ public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
+ return new ILSMOperationTracker() {
+
+ @Override
+ public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ // Do nothing.
+ }
+
+ @Override
+ public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ }
+
+ @Override
+ public void afterOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ // Do nothing.
+ }
+ };
+ }
+
+ // Enforce singleton.
+ private NoOpOperationTrackerFactory() {
+ }
+
+};
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
new file mode 100644
index 0000000..9bbd394
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
@@ -0,0 +1,19 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+
+public enum SynchronousScheduler implements ILSMIOOperationScheduler {
+ INSTANCE;
+
+ @Override
+ public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException {
+ try {
+ operation.perform();
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SynchronousSchedulerProvider.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SynchronousSchedulerProvider.java
new file mode 100644
index 0000000..72d9d1d
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/SynchronousSchedulerProvider.java
@@ -0,0 +1,15 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+
+public enum SynchronousSchedulerProvider implements ILSMIOOperationSchedulerProvider {
+ INSTANCE;
+
+ @Override
+ public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) {
+ return SynchronousScheduler.INSTANCE;
+ }
+
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
new file mode 100644
index 0000000..3b4b00f
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingOperationTrackerFactory.java
@@ -0,0 +1,21 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+
+public class ThreadCountingOperationTrackerFactory implements ILSMOperationTrackerFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public static ThreadCountingOperationTrackerFactory INSTANCE = new ThreadCountingOperationTrackerFactory();
+
+ @Override
+ public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
+ return new ThreadCountingTracker(index);
+ }
+
+ // Enforce singleton.
+ private ThreadCountingOperationTrackerFactory() {
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
new file mode 100644
index 0000000..7fee06e
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ThreadCountingTracker.java
@@ -0,0 +1,49 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class ThreadCountingTracker implements ILSMOperationTracker {
+ private final AtomicInteger threadRefCount;
+ private final ILSMIndex index;
+
+ public ThreadCountingTracker(ILSMIndex index) {
+ this.index = index;
+ this.threadRefCount = new AtomicInteger();
+ }
+
+ @Override
+ public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ if (opType == LSMOperationType.MODIFICATION) {
+ threadRefCount.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void afterOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ // The operation is considered inactive, immediately after leaving the index.
+ completeOperation(opType, searchCallback, modificationCallback);
+ }
+
+ @Override
+ public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
+ IModificationOperationCallback modificationCallback) throws HyracksDataException {
+ // Flush will only be handled by last exiting thread.
+ if (opType == LSMOperationType.MODIFICATION) {
+ if (threadRefCount.decrementAndGet() == 0 && index.getFlushStatus(index)) {
+ ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleFlush(NoOpIOOperationCallback.INSTANCE);
+ }
+ }
+ }
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/TreeIndexFactory.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/TreeIndexFactory.java
new file mode 100644
index 0000000..f570058
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/TreeIndexFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManagerFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public abstract class TreeIndexFactory<T extends ITreeIndex> extends IndexFactory<T> {
+
+ protected final ITreeIndexFrameFactory interiorFrameFactory;
+ protected final ITreeIndexFrameFactory leafFrameFactory;
+ protected final IBinaryComparatorFactory[] cmpFactories;
+ protected final int fieldCount;
+
+ public TreeIndexFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+ IFreePageManagerFactory freePageManagerFactory, ITreeIndexFrameFactory interiorFrameFactory,
+ ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount) {
+ super(bufferCache, fileMapProvider, freePageManagerFactory);
+ this.interiorFrameFactory = interiorFrameFactory;
+ this.leafFrameFactory = leafFrameFactory;
+ this.cmpFactories = cmpFactories;
+ this.fieldCount = fieldCount;
+ }
+}
\ No newline at end of file