Removed atomic rename for LSM components. Using special value in metadata page of trees to guarantee consistency of written LSM components. When adding a new LSM component, it is forced to disk, and then the special value is force to disk.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1173 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index 5fce2a1..0389034 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -893,6 +893,11 @@
return fileId;
}
+ @Override
+ public IBufferCache getBufferCache() {
+ return bufferCache;
+ }
+
public byte getTreeHeight(IBTreeLeafFrame leafFrame) throws HyracksDataException {
ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
rootNode.acquireReadLatch();
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IFreePageManager.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IFreePageManager.java
index 045ff9d..6db933d 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IFreePageManager.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/IFreePageManager.java
@@ -26,4 +26,6 @@
public boolean isMetaPage(ITreeIndexMetaDataFrame metaFrame);
public boolean isFreePage(ITreeIndexMetaDataFrame metaFrame);
+
+ public int getFirstMetadataPage();
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java
index 1def8db..384e66f 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndex.java
@@ -18,6 +18,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
/**
* Interface describing the operations of tree-based index structures. Indexes
@@ -110,4 +111,9 @@
* @return The file id of this index.
*/
public int getFileId();
+
+ /**
+ * @return BufferCache underlying this tree index.
+ */
+ public IBufferCache getBufferCache();
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
index 17519ae..9e95970 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
@@ -41,4 +41,10 @@
public boolean hasSpace();
public void addFreePage(int freePage);
+
+ // Special flag for LSM-Components to mark whether they are valid or not.
+ public boolean isValid();
+
+ // Set special validity flag.
+ public void setValid(boolean isValid);
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
index c87b84a..31c674d 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
@@ -27,11 +27,15 @@
public class LIFOMetaDataFrame implements ITreeIndexMetaDataFrame {
+ // Arbitrarily chosen magic integer.
+ protected static final int MAGIC_VALID_INT = 0x5bd1e995;
+
protected static final int tupleCountOff = 0; //0
protected static final int freeSpaceOff = tupleCountOff + 4; //4
protected static final int maxPageOff = freeSpaceOff + 4; //8
protected static final int levelOff = maxPageOff + 12; //20
protected static final int nextPageOff = levelOff + 1; // 21
+ protected static final int validOff = nextPageOff + 4; // 25
protected ICachedPage page = null;
protected ByteBuffer buf = null;
@@ -65,7 +69,7 @@
return buf.getInt(freeSpaceOff) + 4 < buf.capacity();
}
- // on bounds checking is done, there must be free space
+ // no bounds checking is done, there must be free space
public void addFreePage(int freePage) {
int freeSpace = buf.getInt(freeSpaceOff);
buf.putInt(freeSpace, freePage);
@@ -97,10 +101,11 @@
@Override
public void initBuffer(byte level) {
buf.putInt(tupleCountOff, 0);
- buf.putInt(freeSpaceOff, nextPageOff + 4);
+ buf.putInt(freeSpaceOff, validOff + 4);
//buf.putInt(maxPageOff, -1);
buf.put(levelOff, level);
buf.putInt(nextPageOff, -1);
+ setValid(false);
}
@Override
@@ -112,4 +117,18 @@
public void setNextPage(int nextPage) {
buf.putInt(nextPageOff, nextPage);
}
+
+ @Override
+ public boolean isValid() {
+ return buf.getInt(validOff) == MAGIC_VALID_INT;
+ }
+
+ @Override
+ public void setValid(boolean isValid) {
+ if (isValid) {
+ buf.putInt(validOff, MAGIC_VALID_INT);
+ } else {
+ buf.putInt(validOff, 0);
+ }
+ }
}
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/freepage/LinkedListFreePageManager.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/freepage/LinkedListFreePageManager.java
index f7e2e3b..e72de8b 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/freepage/LinkedListFreePageManager.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/freepage/LinkedListFreePageManager.java
@@ -199,4 +199,9 @@
public boolean isMetaPage(ITreeIndexMetaDataFrame metaFrame) {
return metaFrame.getLevel() == META_PAGE_LEVEL_INDICATOR;
}
+
+ @Override
+ public int getFirstMetadataPage() {
+ return headPage;
+ }
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index ef481ba..f6849eb 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -43,11 +43,13 @@
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexComponentFinalizer;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -71,6 +73,8 @@
private final IFileMapProvider diskFileMapProvider;
// List of BTree instances. Using Object for better sharing via ILSMTree + LSMHarness.
private LinkedList<Object> diskBTrees = new LinkedList<Object>();
+ // Helps to guarantees physical consistency of LSM components.
+ private final ILSMComponentFinalizer componentFinalizer;
// Common for in-memory and on-disk components.
private final ITreeIndexFrameFactory insertLeafFrameFactory;
@@ -94,6 +98,7 @@
this.diskBTrees = new LinkedList<Object>();
this.fileManager = fileNameManager;
lsmHarness = new LSMHarness(this);
+ componentFinalizer = new TreeIndexComponentFinalizer();
}
@Override
@@ -116,7 +121,8 @@
List<Object> validFileNames = fileManager.cleanupAndGetValidFiles();
for (Object o : validFileNames) {
String fileName = (String) o;
- BTree btree = createDiskBTree(diskBTreeFactory, fileName, false);
+ FileReference fileRef = new FileReference(new File(fileName));
+ BTree btree = createDiskBTree(diskBTreeFactory, fileRef, false);
diskBTrees.add(btree);
}
}
@@ -124,8 +130,9 @@
@Override
public void close() throws HyracksDataException {
for (Object o : diskBTrees) {
- BTree btree = (BTree) o;
+ BTree btree = (BTree) o;
diskBufferCache.closeFile(btree.getFileId());
+ diskBufferCache.deleteFile(btree.getFileId(), false);
btree.close();
}
diskBTrees.clear();
@@ -243,7 +250,7 @@
ITreeIndexAccessor memBTreeAccessor = memBTree.createAccessor();
ITreeIndexCursor scanCursor = memBTreeAccessor.createSearchCursor();
memBTreeAccessor.search(scanCursor, nullPred);
- BTree diskBTree = createTempBTree(diskBTreeFactory);
+ BTree diskBTree = createFlushTarget();
// Bulk load the tuples from the in-memory BTree into the new disk BTree.
IIndexBulkLoadContext bulkLoadCtx = diskBTree.beginBulkLoad(1.0f);
try {
@@ -255,9 +262,6 @@
scanCursor.close();
}
diskBTree.endBulkLoad(bulkLoadCtx);
-
- String finalFileName = (String) fileManager.getFlushFileName();
- rename(diskBTree, finalFileName);
return diskBTree;
}
@@ -272,46 +276,29 @@
memBTree.create(memBTree.getFileId());
}
- private String getMergeTargetFileName(List<Object> mergingDiskBTrees) throws HyracksDataException {
+ private BTree createBulkLoadTarget() throws HyracksDataException {
+ String relFlushFileName = (String) fileManager.getRelFlushFileName();
+ FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
+ return createDiskBTree(bulkLoadBTreeFactory, fileRef, true);
+ }
+
+ private BTree createFlushTarget() throws HyracksDataException {
+ String relFlushFileName = (String) fileManager.getRelFlushFileName();
+ FileReference fileRef = fileManager.createFlushFile(relFlushFileName);
+ return createDiskBTree(diskBTreeFactory, fileRef, true);
+ }
+
+ private BTree createMergeTarget(List<Object> mergingDiskBTrees) throws HyracksDataException {
BTree firstBTree = (BTree) mergingDiskBTrees.get(0);
BTree lastBTree = (BTree) mergingDiskBTrees.get(mergingDiskBTrees.size() - 1);
FileReference firstFile = diskFileMapProvider.lookupFileName(firstBTree.getFileId());
FileReference lastFile = diskFileMapProvider.lookupFileName(lastBTree.getFileId());
- String fileName = (String) fileManager.getMergeFileName(firstFile.getFile().getName(), lastFile.getFile()
- .getName());
- return fileName;
+ String relMergeFileName = (String) fileManager.getRelMergeFileName(firstFile.getFile().getName(), lastFile.getFile().getName());
+ FileReference fileRef = fileManager.createMergeFile(relMergeFileName);
+ return createDiskBTree(diskBTreeFactory, fileRef, true);
}
- private void rename(BTree srcTmpBTree, String dest) throws HyracksDataException {
- int tmpFileId = srcTmpBTree.getFileId();
- FileReference srcFileRef = diskFileMapProvider.lookupFileName(tmpFileId);
- diskBufferCache.closeFile(tmpFileId);
- diskBufferCache.deleteFile(tmpFileId, true);
- FileReference destFileRef = fileManager.rename(srcFileRef, dest);
- // File will be deleted during cleanup of merge().
- diskBufferCache.createFile(destFileRef);
- int newFileId = diskFileMapProvider.lookupFileId(destFileRef);
- diskBufferCache.openFile(newFileId);
- srcTmpBTree.open(newFileId);
- }
-
- private BTree createTempBTree(BTreeFactory factory) throws HyracksDataException {
- FileReference file = fileManager.createTempFile();
- // File will be deleted during rename().
- diskBufferCache.createFile(file);
- int diskBTreeFileId = diskFileMapProvider.lookupFileId(file);
- // File will be closed during rename().
- diskBufferCache.openFile(diskBTreeFileId);
- // Create new BTree instance.
- BTree diskBTree = factory.createBTreeInstance(diskBTreeFileId);
- diskBTree.create(diskBTreeFileId);
- // BTree will be closed during cleanup of merge().
- diskBTree.open(diskBTreeFileId);
- return diskBTree;
- }
-
- private BTree createDiskBTree(BTreeFactory factory, String fileName, boolean createBTree) throws HyracksDataException {
- FileReference fileRef = new FileReference(new File(fileName));
+ private BTree createDiskBTree(BTreeFactory factory, FileReference fileRef, boolean createBTree) throws HyracksDataException {
// File will be deleted during cleanup of merge().
diskBufferCache.createFile(fileRef);
int diskBTreeFileId = diskFileMapProvider.lookupFileId(fileRef);
@@ -371,13 +358,13 @@
mergedComponents.addAll(mergingDiskBTrees);
// Nothing to merge.
- if (mergedComponents.isEmpty()) {
+ if (mergedComponents.size() <= 1) {
+ cursor.close();
return null;
}
// Bulk load the tuples from all on-disk BTrees into the new BTree.
- //BTree mergedBTree = createMergeTargetBTree(mergedComponents);
- BTree mergedBTree = createTempBTree(diskBTreeFactory);
+ BTree mergedBTree = createMergeTarget(mergingDiskBTrees);
IIndexBulkLoadContext bulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
try {
while (cursor.hasNext()) {
@@ -388,10 +375,7 @@
} finally {
cursor.close();
}
- mergedBTree.endBulkLoad(bulkLoadCtx);
-
- String finalFileName = getMergeTargetFileName(mergingDiskBTrees);
- rename(mergedBTree, finalFileName);
+ mergedBTree.endBulkLoad(bulkLoadCtx);
return mergedBTree;
}
@@ -407,6 +391,7 @@
BTree oldBTree = (BTree) o;
FileReference fileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
diskBufferCache.closeFile(oldBTree.getFileId());
+ diskBufferCache.deleteFile(oldBTree.getFileId(), false);
oldBTree.close();
fileRef.getFile().delete();
}
@@ -445,7 +430,7 @@
@Override
public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
- BTree diskBTree = createTempBTree(bulkLoadBTreeFactory);
+ BTree diskBTree = createBulkLoadTarget();
LSMTreeBulkLoadContext bulkLoadCtx = new LSMTreeBulkLoadContext(diskBTree);
bulkLoadCtx.beginBulkLoad(fillFactor);
return bulkLoadCtx;
@@ -461,8 +446,6 @@
public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
LSMTreeBulkLoadContext bulkLoadCtx = (LSMTreeBulkLoadContext) ictx;
bulkLoadCtx.getBTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
- String finalFileName = (String) fileManager.getFlushFileName();
- rename(bulkLoadCtx.getBTree(), finalFileName);
lsmHarness.addBulkLoadedComponent(bulkLoadCtx.getBTree());
}
@@ -529,4 +512,14 @@
return concreteCtx.cmp;
}
}
+
+ @Override
+ public ILSMComponentFinalizer getComponentFinalizer() {
+ return componentFinalizer;
+ }
+
+ @Override
+ public IBufferCache getBufferCache() {
+ return diskBufferCache;
+ }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFinalizer.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFinalizer.java
new file mode 100644
index 0000000..3423e75
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMComponentFinalizer.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ILSMComponentFinalizer {
+
+ /**
+ * @return Whether the given LSM component is considered valid. Used for guaranteeing
+ * atomicity of LSM component writes.
+ */
+ public boolean isValid(Object lsmComponent) throws HyracksDataException;
+
+ /**
+ * Marks the given LSM component as physically valid, synchronously forcing
+ * the necessary information to disk. This call only return once the
+ * physical consistency of the given component is guaranteed.
+ *
+ */
+ public void finalize(Object lsmComponent) throws HyracksDataException;
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFileManager.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFileManager.java
index af1b942..1184b76 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFileManager.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMFileManager.java
@@ -35,17 +35,16 @@
public interface ILSMFileManager {
public void createDirs();
- public Object getFlushFileName();
+ public FileReference createFlushFile(String relFlushFileName);
+
+ public FileReference createMergeFile(String relMergeFileName);
+
+ public Object getRelFlushFileName();
- public Object getMergeFileName(String firstFileName, String lastFileName) throws HyracksDataException;
+ public Object getRelMergeFileName(String firstFileName, String lastFileName) throws HyracksDataException;
public String getBaseDir();
- public FileReference createTempFile() throws HyracksDataException;
-
- // Atomically renames src file ref to dest on same IODevice as src, and returns file ref of dest.
- public FileReference rename(FileReference src, String dest) throws HyracksDataException;
-
// Deletes invalid files, and returns list of valid files from baseDir.
// The returned valid files are correctly sorted (based on the recency of data).
public List<Object> cleanupAndGetValidFiles() throws HyracksDataException;
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
index e272ec8..dd1e3f9 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMTree.java
@@ -58,4 +58,6 @@
public void resetInMemoryComponent() throws HyracksDataException;
public List<Object> getDiskComponents();
+
+ public ILSMComponentFinalizer getComponentFinalizer();
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
index 1e2da77..303369e 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryBufferCache.java
@@ -174,4 +174,12 @@
latch.writeLock().unlock();
}
}
+
+ @Override
+ public void force(int fileId, boolean metadata) throws HyracksDataException {
+ }
+
+ @Override
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryFreePageManager.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryFreePageManager.java
index 304aa43..846c274 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryFreePageManager.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/freepage/InMemoryFreePageManager.java
@@ -93,4 +93,10 @@
public boolean isFreePage(ITreeIndexMetaDataFrame metaFrame) {
return false;
}
+
+ @Override
+ public int getFirstMetadataPage() {
+ // Method doesn't make sense for this free page manager.
+ return -1;
+ }
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 033e68c..222d627 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -122,10 +122,17 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Flushing LSM-Tree.");
}
- Object newDiskComponent = lsmTree.flush();
+ Object newComponent = lsmTree.flush();
+
+ // The implementation of this call must take any necessary steps to make
+ // the new component permanent, and mark it as valid (usually this means
+ // forcing all pages of the tree to disk, possibly with some extra
+ // information to mark the tree as valid).
+ lsmTree.getComponentFinalizer().finalize(newComponent);
+
lsmTree.resetInMemoryComponent();
synchronized (diskComponentsSync) {
- lsmTree.addFlushedComponent(newDiskComponent);
+ lsmTree.addFlushedComponent(newComponent);
}
}
@@ -182,6 +189,7 @@
Object newComponent = lsmTree.merge(mergedComponents);
// No merge happened.
if (newComponent == null) {
+ isMerging.set(false);
return;
}
@@ -210,6 +218,12 @@
}
}
+ // The implementation of this call must take any necessary steps to make
+ // the new component permanent, and mark it as valid (usually this means
+ // forcing all pages of the tree to disk, possibly with some extra
+ // information to mark the tree as valid).
+ lsmTree.getComponentFinalizer().finalize(newComponent);
+
// Cleanup. At this point we have guaranteed that no searchers are
// touching the old on-disk Trees (localSearcherRefCount == 0).
lsmTree.cleanUpAfterMerge(mergedComponents);
@@ -230,8 +244,13 @@
searcherRefCount.decrementAndGet();
}
- public void addBulkLoadedComponent(Object index) {
- synchronized (diskComponentsSync) {
+ public void addBulkLoadedComponent(Object index) throws HyracksDataException {
+ // The implementation of this call must take any necessary steps to make
+ // the new component permanent, and mark it as valid (usually this means
+ // forcing all pages of the tree to disk, possibly with some extra
+ // information to mark the tree as valid).
+ lsmTree.getComponentFinalizer().finalize(index);
+ synchronized (diskComponentsSync) {
lsmTree.addFlushedComponent(index);
}
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeFileManager.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeFileManager.java
index c30a999..2c179e7 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeFileManager.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeFileManager.java
@@ -17,7 +17,6 @@
import java.io.File;
import java.io.FilenameFilter;
-import java.io.IOException;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -35,7 +34,6 @@
public class LSMTreeFileManager implements ILSMFileManager {
protected static final String SPLIT_STRING = "_";
- protected static final String TEMP_FILE_PREFIX = "lsm_tree";
// Currently uses all IODevices registered in ioManager in a round-robin fashion.
protected final IOManager ioManager;
@@ -44,6 +42,8 @@
protected final Format formatter = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS");
protected final Comparator<String> cmp = new FileNameComparator();
protected final Comparator<ComparableFileName> recencyCmp = new RecencyComparator();
+ // To implement round-robin assignment of files onto I/O devices.
+ private int ioDeviceIndex = 0;
public LSMTreeFileManager(IOManager ioManager, String baseDir) {
if (!baseDir.endsWith(System.getProperty("file.separator"))) {
@@ -62,27 +62,19 @@
}
}
- @Override
- public FileReference createTempFile() throws HyracksDataException {
- // Cycles through the IODevices in round-robin fashion.
- return ioManager.createWorkspaceFile(TEMP_FILE_PREFIX);
+ public FileReference createFlushFile(String relFlushFileName) {
+ // Assigns new files to I/O devices in round-robin fashion.
+ IODeviceHandle dev = ioManager.getIODevices().get(ioDeviceIndex);
+ ioDeviceIndex = (ioDeviceIndex + 1) % ioManager.getIODevices().size();
+ return dev.createFileReference(relFlushFileName);
}
- // Atomically renames src fileref to dest on same IODevice as src, and returns file ref of dest.
- @Override
- public FileReference rename(FileReference src, String dest) throws HyracksDataException {
- FileReference destFile = new FileReference(src.getDevideHandle(), dest);
- //try {
- //Files.move(src.getFile().toPath(), destFile.getFile().toPath(), StandardCopyOption.ATOMIC_MOVE);
- src.getFile().renameTo(destFile.getFile());
- //} catch (IOException e) {
- // throw new HyracksDataException(e);
- //}
- return destFile;
+ public FileReference createMergeFile(String relMergeFileName) {
+ return createFlushFile(relMergeFileName);
}
@Override
- public Object getFlushFileName() {
+ public Object getRelFlushFileName() {
Date date = new Date();
String ts = formatter.format(date);
// Begin timestamp and end timestamp are identical.
@@ -90,7 +82,7 @@
}
@Override
- public Object getMergeFileName(String firstFileName, String lastFileName) throws HyracksDataException {
+ public Object getRelMergeFileName(String firstFileName, String lastFileName) throws HyracksDataException {
String[] firstTimestampRange = firstFileName.split(SPLIT_STRING);
String[] lastTimestampRange = lastFileName.split(SPLIT_STRING);
// Enclosing timestamp range.
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/TreeIndexComponentFinalizer.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/TreeIndexComponentFinalizer.java
new file mode 100644
index 0000000..09cfff3
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/TreeIndexComponentFinalizer.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+
+public class TreeIndexComponentFinalizer implements ILSMComponentFinalizer {
+
+ @Override
+ public boolean isValid(Object lsmComponent) throws HyracksDataException {
+ ITreeIndex treeIndex = (ITreeIndex) lsmComponent;
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ int metadataPage = treeIndex.getFreePageManager().getFirstMetadataPage();
+ ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+ ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(treeIndex.getFileId(), metadataPage), false);
+ page.acquireReadLatch();
+ try {
+ metadataFrame.setPage(page);
+ return metadataFrame.isValid();
+ } finally {
+ page.releaseReadLatch();
+ bufferCache.unpin(page);
+ }
+ }
+
+ @Override
+ public void finalize(Object lsmComponent) throws HyracksDataException {
+ ITreeIndex treeIndex = (ITreeIndex) lsmComponent;
+ int fileId = treeIndex.getFileId();
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ // Flush all dirty pages of the BTree.
+ // By default, metadata and data are flushed async in the buffercache.
+ // This means that the flush issues writes to the OS, but the data may still lie in filesystem buffers.
+ ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+ int startPage = 0;
+ int maxPage = treeIndex.getFreePageManager().getMaxPage(metadataFrame);
+ for (int i = startPage; i <= maxPage; i++) {
+ ICachedPage page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, i));
+ // If tryPin returns null, it means the page is not cached, and therefore cannot be dirty.
+ if (page == null) {
+ continue;
+ }
+ try {
+ bufferCache.flushDirtyPage(page);
+ } finally {
+ bufferCache.unpin(page);
+ }
+ }
+ // Forces all pages of given file to disk. This guarantees the data makes it to disk.
+ bufferCache.force(fileId, true);
+ int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
+ ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
+ metadataPage.acquireWriteLatch();
+ try {
+ metadataFrame.setPage(metadataPage);
+ metadataFrame.setValid(true);
+ // Flush the single modified page to disk.
+ bufferCache.flushDirtyPage(metadataPage);
+ // Force modified metadata page to disk.
+ bufferCache.force(fileId, true);
+ } finally {
+ metadataPage.releaseWriteLatch();
+ bufferCache.unpin(metadataPage);
+ }
+ }
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index c897943..9a831d8 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -41,6 +41,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTree;
import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
@@ -95,7 +96,9 @@
// List of LSMRTreeComponent instances. Using Object for better sharing via
// ILSMTree + LSMHarness.
private final LinkedList<Object> diskComponents = new LinkedList<Object>();
-
+ // Helps to guarantees physical consistency of LSM components.
+ private final ILSMComponentFinalizer componentFinalizer;
+
private IBinaryComparatorFactory[] btreeCmpFactories;
private IBinaryComparatorFactory[] rtreeCmpFactories;
@@ -129,6 +132,7 @@
this.btreeCmpFactories = btreeCmpFactories;
this.rtreeCmpFactories = rtreeCmpFactories;
this.lsmHarness = new LSMHarness(this);
+ componentFinalizer = new LSMRTreeComponentFinalizer();
}
@Override
@@ -152,13 +156,14 @@
memComponent.getBTree().open(MEM_BTREE_FILE_ID);
List<Object> validFileNames = fileManager.cleanupAndGetValidFiles();
for (Object o : validFileNames) {
- LSMRTreeFileNameComponent component = (LSMRTreeFileNameComponent) o;
- RTree rtree = (RTree) createDiskTree(diskRTreeFactory, component.getRTreeFileName(), false);
- BTree btree = (BTree) createDiskTree(diskBTreeFactory, component.getBTreeFileName(), false);
+ LSMRTreeFileNameComponent component = (LSMRTreeFileNameComponent) o;
+ FileReference rtreeFile = new FileReference(new File(component.getRTreeFileName()));
+ FileReference btreeFile = new FileReference(new File(component.getBTreeFileName()));
+ RTree rtree = (RTree) createDiskTree(diskRTreeFactory, rtreeFile, false);
+ BTree btree = (BTree) createDiskTree(diskBTreeFactory, btreeFile, false);
LSMRTreeComponent diskComponent = new LSMRTreeComponent(rtree, btree);
diskComponents.add(diskComponent);
}
-
}
@Override
@@ -168,8 +173,10 @@
RTree rtree = diskComponent.getRTree();
BTree btree = diskComponent.getBTree();
diskBufferCache.closeFile(rtree.getFileId());
+ diskBufferCache.deleteFile(rtree.getFileId(), false);
rtree.close();
diskBufferCache.closeFile(btree.getFileId());
+ diskBufferCache.deleteFile(btree.getFileId(), false);
btree.close();
}
diskComponents.clear();
@@ -183,45 +190,15 @@
FileReference firstFile = diskFileMapProvider.lookupFileName(firstTree.getFileId());
FileReference lastFile = diskFileMapProvider.lookupFileName(lastTree.getFileId());
LSMRTreeFileNameComponent component = (LSMRTreeFileNameComponent) ((LSMRTreeFileManager) fileManager)
- .getMergeFileName(firstFile.getFile().getName(), lastFile.getFile().getName());
+ .getRelMergeFileName(firstFile.getFile().getName(), lastFile.getFile().getName());
return component;
}
- private void rename(ITreeIndex srcTmpTree, String dest) throws HyracksDataException {
- int tmpFileId = srcTmpTree.getFileId();
- FileReference srcFileRef = diskFileMapProvider.lookupFileName(tmpFileId);
- diskBufferCache.closeFile(tmpFileId);
- diskBufferCache.deleteFile(tmpFileId, true);
- FileReference destFileRef = fileManager.rename(srcFileRef, dest);
- // File will be deleted during cleanup of merge().
- diskBufferCache.createFile(destFileRef);
- int newFileId = diskFileMapProvider.lookupFileId(destFileRef);
- diskBufferCache.openFile(newFileId);
- srcTmpTree.open(newFileId);
- }
-
- private ITreeIndex createTempTree(TreeFactory diskTreeFactory) throws HyracksDataException {
- FileReference file = fileManager.createTempFile();
- // File will be deleted during rename().
- diskBufferCache.createFile(file);
- int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
- // File will be closed during rename().
- diskBufferCache.openFile(diskTreeFileId);
- // Create new tree instance.
- ITreeIndex diskTree = diskTreeFactory.createIndexInstance(diskTreeFileId);
- diskTree.create(diskTreeFileId);
- // Tree will be closed during cleanup of merge().
- diskTree.open(diskTreeFileId);
- return diskTree;
- }
-
- protected ITreeIndex createDiskTree(TreeFactory diskTreeFactory, String fileName, boolean createTree)
+ protected ITreeIndex createDiskTree(TreeFactory diskTreeFactory, FileReference fileRef, boolean createTree)
throws HyracksDataException {
- // Register the new tree file.
- FileReference file = new FileReference(new File(fileName));
// File will be deleted during cleanup of merge().
- diskBufferCache.createFile(file);
- int diskTreeFileId = diskFileMapProvider.lookupFileId(file);
+ diskBufferCache.createFile(fileRef);
+ int diskTreeFileId = diskFileMapProvider.lookupFileId(fileRef);
// File will be closed during cleanup of merge().
diskBufferCache.openFile(diskTreeFileId);
// Create new tree instance.
@@ -236,10 +213,15 @@
@Override
public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws TreeIndexException, HyracksDataException {
- RTree diskRTree = (RTree) createTempTree(diskRTreeFactory);
+ // Note that by using a flush target file name, we state that the new
+ // bulk loaded tree is "newer" than any other merged tree.
+ LSMRTreeFileNameComponent fileNames = (LSMRTreeFileNameComponent) fileManager.getRelFlushFileName();
+ FileReference rtreeFile = fileManager.createFlushFile(fileNames.getRTreeFileName());
+ RTree diskRTree = (RTree) createDiskTree(diskRTreeFactory, rtreeFile, true);
// For each RTree, we require to have a buddy BTree. thus, we create an
// empty BTree.
- BTree diskBTree = (BTree) createTempTree(diskBTreeFactory);
+ FileReference btreeFile = fileManager.createFlushFile(fileNames.getBTreeFileName());
+ BTree diskBTree = (BTree) createDiskTree(diskBTreeFactory, btreeFile, true);
LSMRTreeBulkLoadContext bulkLoadCtx = new LSMRTreeBulkLoadContext(diskRTree, diskBTree);
bulkLoadCtx.beginBulkLoad(fillFactor);
return bulkLoadCtx;
@@ -249,17 +231,12 @@
public void bulkLoadAddTuple(ITupleReference tuple, IIndexBulkLoadContext ictx) throws HyracksDataException {
LSMRTreeBulkLoadContext bulkLoadCtx = (LSMRTreeBulkLoadContext) ictx;
bulkLoadCtx.getRTree().bulkLoadAddTuple(tuple, bulkLoadCtx.getBulkLoadCtx());
-
}
@Override
public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
LSMRTreeBulkLoadContext bulkLoadCtx = (LSMRTreeBulkLoadContext) ictx;
bulkLoadCtx.getRTree().endBulkLoad(bulkLoadCtx.getBulkLoadCtx());
- LSMRTreeFileNameComponent component = (LSMRTreeFileNameComponent) ((LSMRTreeFileManager) fileManager)
- .getFlushFileName();
- rename(bulkLoadCtx.getRTree(), component.getRTreeFileName());
- rename(bulkLoadCtx.getBTree(), component.getBTreeFileName());
LSMRTreeComponent diskComponent = new LSMRTreeComponent(bulkLoadCtx.getRTree(), bulkLoadCtx.getBTree());
lsmHarness.addBulkLoadedComponent(diskComponent);
}
@@ -400,7 +377,9 @@
ITreeIndexCursor rtreeScanCursor = memRTreeAccessor.createSearchCursor();
SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
- RTree diskRTree = (RTree) createTempTree(diskRTreeFactory);
+ LSMRTreeFileNameComponent fileNames = (LSMRTreeFileNameComponent) fileManager.getRelFlushFileName();
+ FileReference rtreeFile = fileManager.createFlushFile(fileNames.getRTreeFileName());
+ RTree diskRTree = (RTree) createDiskTree(diskRTreeFactory, rtreeFile, true);
// BulkLoad the tuples from the in-memory tree into the new disk RTree.
IIndexBulkLoadContext rtreeBulkLoadCtx = diskRTree.beginBulkLoad(1.0f);
@@ -421,7 +400,8 @@
ITreeIndexCursor btreeScanCursor = memBTreeAccessor.createSearchCursor();
RangePredicate btreeNullPredicate = new RangePredicate(null, null, true, true, null, null);
memBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
- BTree diskBTree = (BTree) createTempTree(diskBTreeFactory);
+ FileReference btreeFile = fileManager.createFlushFile(fileNames.getBTreeFileName());
+ BTree diskBTree = (BTree) createDiskTree(diskBTreeFactory, btreeFile, true);
// BulkLoad the tuples from the in-memory tree into the new disk BTree.
IIndexBulkLoadContext btreeBulkLoadCtx = diskBTree.beginBulkLoad(1.0f);
@@ -435,12 +415,6 @@
btreeScanCursor.close();
}
diskBTree.endBulkLoad(btreeBulkLoadCtx);
-
- LSMRTreeFileNameComponent component = (LSMRTreeFileNameComponent) ((LSMRTreeFileManager) fileManager)
- .getFlushFileName();
- rename(diskRTree, component.getRTreeFileName());
- rename(diskBTree, component.getBTreeFileName());
-
return new LSMRTreeComponent(diskRTree, diskBTree);
}
@@ -458,13 +432,18 @@
mergedComponents.addAll(mergingComponents);
// Nothing to merge.
- if (mergedComponents.isEmpty()) {
+ if (mergedComponents.size() <= 1) {
+ cursor.close();
return null;
}
// Bulk load the tuples from all on-disk RTrees into the new RTree.
- RTree mergedRTree = (RTree) createTempTree(diskRTreeFactory);
- BTree mergedBTree = (BTree) createTempTree(diskBTreeFactory);
+ LSMRTreeFileNameComponent fileNames = getMergeTargetFileName(mergingComponents);
+ FileReference rtreeFile = fileManager.createMergeFile(fileNames.getRTreeFileName());
+ FileReference btreeFile = fileManager.createMergeFile(fileNames.getBTreeFileName());
+ RTree mergedRTree = (RTree) createDiskTree(diskRTreeFactory, rtreeFile, true);
+ BTree mergedBTree = (BTree) createDiskTree(diskBTreeFactory, btreeFile, true);
+
IIndexBulkLoadContext bulkLoadCtx = mergedRTree.beginBulkLoad(1.0f);
try {
while (cursor.hasNext()) {
@@ -476,10 +455,11 @@
cursor.close();
}
mergedRTree.endBulkLoad(bulkLoadCtx);
-
- LSMRTreeFileNameComponent component = getMergeTargetFileName(mergingComponents);
- rename(mergedRTree, component.getRTreeFileName());
- rename(mergedBTree, component.getBTreeFileName());
+
+ // Load an empty BTree tree.
+ IIndexBulkLoadContext btreeBulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
+ mergedBTree.endBulkLoad(btreeBulkLoadCtx);
+
return new LSMRTreeComponent(mergedRTree, mergedBTree);
}
@@ -496,11 +476,13 @@
BTree oldBTree = component.getBTree();
FileReference btreeFileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
diskBufferCache.closeFile(oldBTree.getFileId());
+ diskBufferCache.deleteFile(oldBTree.getFileId(), false);
oldBTree.close();
btreeFileRef.getFile().delete();
RTree oldRTree = component.getRTree();
FileReference rtreeFileRef = diskFileMapProvider.lookupFileName(oldRTree.getFileId());
diskBufferCache.closeFile(oldRTree.getFileId());
+ diskBufferCache.deleteFile(oldRTree.getFileId(), false);
oldRTree.close();
rtreeFileRef.getFile().delete();
}
@@ -561,4 +543,14 @@
public IBinaryComparatorFactory[] getComparatorFactories() {
return rtreeCmpFactories;
}
+
+ @Override
+ public IBufferCache getBufferCache() {
+ return diskBufferCache;
+ }
+
+ @Override
+ public ILSMComponentFinalizer getComponentFinalizer() {
+ return componentFinalizer;
+ }
}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponentFinalizer.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponentFinalizer.java
new file mode 100644
index 0000000..1a9d9b0
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeComponentFinalizer.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.TreeIndexComponentFinalizer;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTree.LSMRTreeComponent;
+
+public class LSMRTreeComponentFinalizer implements ILSMComponentFinalizer {
+
+ TreeIndexComponentFinalizer treeIndexFinalizer = new TreeIndexComponentFinalizer();
+
+ @Override
+ public boolean isValid(Object lsmComponent) throws HyracksDataException {
+ LSMRTreeComponent component = (LSMRTreeComponent) lsmComponent;
+ return treeIndexFinalizer.isValid(component.getRTree())
+ && treeIndexFinalizer.isValid(component.getBTree());
+ }
+
+ @Override
+ public void finalize(Object lsmComponent) throws HyracksDataException {
+ LSMRTreeComponent component = (LSMRTreeComponent) lsmComponent;
+ treeIndexFinalizer.finalize(component.getRTree());
+ treeIndexFinalizer.finalize(component.getBTree());
+ }
+}
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
index 07daa43..b30b48a 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java
@@ -37,16 +37,16 @@
}
@Override
- public Object getFlushFileName() {
- String baseName = (String) super.getFlushFileName();
+ public Object getRelFlushFileName() {
+ String baseName = (String) super.getRelFlushFileName();
return new LSMRTreeFileNameComponent(baseName + SPLIT_STRING + RTREE_STRING, baseName + SPLIT_STRING
+ BTREE_STRING);
}
@Override
- public Object getMergeFileName(String firstFileName, String lastFileName) throws HyracksDataException {
- String baseName = (String) super.getMergeFileName(firstFileName, lastFileName);
+ public Object getRelMergeFileName(String firstFileName, String lastFileName) throws HyracksDataException {
+ String baseName = (String) super.getRelMergeFileName(firstFileName, lastFileName);
return new LSMRTreeFileNameComponent(baseName + SPLIT_STRING + RTREE_STRING, baseName + SPLIT_STRING
+ BTREE_STRING);
}
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index f724d06..bbe5ba6 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.storage.common.buffercache;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
@@ -513,37 +514,56 @@
setPriority(MAX_PRIORITY);
}
+ public void cleanPage(CachedPage cPage, boolean force) {
+ if (cPage.dirty.get()) {
+ boolean proceed = false;
+ if (force) {
+ cPage.latch.writeLock().lock();
+ proceed = true;
+ } else {
+ proceed = cPage.latch.readLock().tryLock();
+ }
+ if (proceed) {
+ try {
+ // Make sure page is still dirty.
+ if (!cPage.dirty.get()) {
+ return;
+ }
+ boolean cleaned = true;
+ try {
+ write(cPage);
+ } catch (HyracksDataException e) {
+ cleaned = false;
+ }
+ if (cleaned) {
+ cPage.dirty.set(false);
+ cPage.pinCount.decrementAndGet();
+ synchronized (cleanNotification) {
+ ++cleanCount;
+ cleanNotification.notify();
+ }
+ }
+ } finally {
+ if (force) {
+ cPage.latch.writeLock().unlock();
+ } else {
+ cPage.latch.readLock().unlock();
+ }
+ }
+ } else if (shutdownStart) {
+ throw new IllegalStateException(
+ "Cache closed, but unable to acquire read lock on dirty page: " + cPage.dpid);
+ }
+ }
+ }
+
@Override
public synchronized void run() {
try {
while (true) {
- for (int i = 0; i < numPages; ++i) {
- CachedPage cPage = cachedPages[i];
- if (cPage.dirty.get()) {
- if (cPage.latch.readLock().tryLock()) {
- try {
- boolean cleaned = true;
- try {
- write(cPage);
- } catch (HyracksDataException e) {
- cleaned = false;
- }
- if (cleaned) {
- cPage.dirty.set(false);
- cPage.pinCount.decrementAndGet();
- synchronized (cleanNotification) {
- ++cleanCount;
- cleanNotification.notify();
- }
- }
- } finally {
- cPage.latch.readLock().unlock();
- }
- } else if (shutdownStart) {
- throw new IllegalStateException(
- "Cache closed, but unable to acquire read lock on dirty page: " + cPage.dpid);
- }
- }
+ for (int i = 0; i < numPages; ++i) {
+ CachedPage cPage = cachedPages[i];
+ cleanPage(cPage, false);
}
if (shutdownStart) {
break;
@@ -612,7 +632,7 @@
fInfo = fileInfoMap.get(fileId);
if (fInfo == null) {
- // map is full, make room by removing cleaning up unreferenced files
+ // map is full, make room by cleaning up unreferenced files
boolean unreferencedFileFound = true;
while (fileInfoMap.size() >= maxOpenFiles && unreferencedFileFound) {
unreferencedFileFound = false;
@@ -683,7 +703,7 @@
private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage, boolean flushDirtyPages)
throws HyracksDataException {
if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
- int pinCount;
+ int pinCount = -1;
if (cPage.dirty.get()) {
if (flushDirtyPages) {
write(cPage);
@@ -694,7 +714,7 @@
pinCount = cPage.pinCount.get();
}
if (pinCount != 0) {
- throw new IllegalStateException("Page is pinned and file is being closed");
+ throw new IllegalStateException("Page is pinned and file is being closed. Pincount is: " + pinCount);
}
cPage.invalidate();
return true;
@@ -726,6 +746,25 @@
}
@Override
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ // Assumes the caller has pinned the page.
+ cleanerThread.cleanPage((CachedPage)page, true);
+ }
+
+ @Override
+ public void force(int fileId, boolean metadata) throws HyracksDataException {
+ BufferedFileHandle fInfo = null;
+ synchronized (fileInfoMap) {
+ fInfo = fileInfoMap.get(fileId);
+ try {
+ fInfo.getFileHandle().getFileChannel().force(metadata);
+ } catch (IOException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
public synchronized void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Deleting file: " + fileId + " in cache: " + this);
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
index 40afc95..d610c7e 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -155,4 +155,14 @@
public long getCloseFileCount() {
return closeFileCount.get();
}
+
+ @Override
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ bufferCache.flushDirtyPage(page);
+ }
+
+ @Override
+ public void force(int fileId, boolean metadata) throws HyracksDataException {
+ bufferCache.force(fileId, metadata);
+ }
}
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index ab0f03c..e8b407e 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -32,6 +32,10 @@
public void unpin(ICachedPage page) throws HyracksDataException;
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException;
+
+ public void force(int fileId, boolean metadata) throws HyracksDataException;
+
public int getPageSize();
public int getNumPages();
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
index ba9025f..3484097 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/util/LSMBTreeTestHarness.java
@@ -43,6 +43,7 @@
private static final long RANDOM_SEED = 50;
private static final int DEFAULT_DISK_PAGE_SIZE = 256;
private static final int DEFAULT_DISK_NUM_PAGES = 1000;
+ //private static final int DEFAULT_DISK_NUM_PAGES = 100;
private static final int DEFAULT_DISK_MAX_OPEN_FILES = 200;
private static final int DEFAULT_MEM_PAGE_SIZE = 256;
private static final int DEFAULT_MEM_NUM_PAGES = 100;
@@ -101,7 +102,7 @@
}
public void tearDown() throws HyracksDataException {
- diskBufferCache.close();
+ diskBufferCache.close();
File f = new File(onDiskDir);
// TODO: For some reason the dir fails to be deleted. Ask Vinayak about this.
f.deleteOnExit();
diff --git a/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMTreeFileManagerTest.java b/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMTreeFileManagerTest.java
index 76654db..4ae8734 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMTreeFileManagerTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/LSMTreeFileManagerTest.java
@@ -50,7 +50,7 @@
protected final static String sep = System.getProperty("file.separator");
protected IOManager ioManager;
protected String baseDir;
-
+
@Before
public void setUp() throws HyracksException {
TestStorageManagerComponentHolder.init(DEFAULT_PAGE_SIZE, DEFAULT_NUM_PAGES, DEFAULT_MAX_OPEN_FILES);
@@ -59,7 +59,7 @@
File f = new File(baseDir);
f.mkdirs();
}
-
+
@After
public void tearDown() throws HyracksDataException {
File f = new File(baseDir);
@@ -73,13 +73,13 @@
int numFileNames = 100;
long sleepTime = 5;
for (int i = 0; i < numFileNames; i++) {
- String flushFileName = (String) fileManager.getFlushFileName();
+ String flushFileName = (String) fileManager.getRelFlushFileName();
if (testFlushFileName) {
fileNames.addFirst(flushFileName);
}
Thread.sleep(sleepTime);
if (!testFlushFileName) {
- String secondFlushFileName = (String) fileManager.getFlushFileName();
+ String secondFlushFileName = (String) fileManager.getRelFlushFileName();
String mergeFileName = getMergeFileName(fileManager, flushFileName, secondFlushFileName);
fileNames.addFirst(mergeFileName);
Thread.sleep(sleepTime);
@@ -103,26 +103,25 @@
sortOrderTest(true);
sortOrderTest(false);
}
-
+
public void cleanInvalidFilesTest(IOManager ioManager) throws InterruptedException, IOException {
ILSMFileManager fileManager = new LSMTreeFileManager(ioManager, baseDir);
fileManager.createDirs();
-
+
List<FileReference> flushFiles = new ArrayList<FileReference>();
List<FileReference> allFiles = new ArrayList<FileReference>();
-
+
int numFileNames = 100;
long sleepTime = 5;
// Generate a bunch of flush files.
for (int i = 0; i < numFileNames; i++) {
- FileReference flushTempFile = fileManager.createTempFile();
- String flushFileName = (String) fileManager.getFlushFileName();
- FileReference flushFile = fileManager.rename(flushTempFile, flushFileName);
+ String relFlushFileName = (String) fileManager.getRelFlushFileName();
+ FileReference flushFile = fileManager.createFlushFile(relFlushFileName);
flushFiles.add(flushFile);
- Thread.sleep(sleepTime);
+ Thread.sleep(sleepTime);
}
allFiles.addAll(flushFiles);
-
+
// Simulate merging some of the flush files.
// Merge range 0 to 4.
FileReference mergeFile1 = simulateMerge(fileManager, flushFiles.get(0), flushFiles.get(4));
@@ -139,18 +138,19 @@
// Merge range 50 to 79.
FileReference mergeFile5 = simulateMerge(fileManager, flushFiles.get(50), flushFiles.get(79));
allFiles.add(mergeFile5);
-
+
// Simulate merging of merge files.
FileReference mergeFile6 = simulateMerge(fileManager, mergeFile1, mergeFile2);
allFiles.add(mergeFile6);
FileReference mergeFile7 = simulateMerge(fileManager, mergeFile3, mergeFile4);
allFiles.add(mergeFile7);
-
- // Set delete on exit for all files.
- for (FileReference fileRef : allFiles) {
+
+ // Create all files and set delete on exit for all files.
+ for (FileReference fileRef : allFiles) {
+ fileRef.getFile().createNewFile();
fileRef.getFile().deleteOnExit();
}
-
+
// Populate expected valid flush files.
List<String> expectedValidFiles = new ArrayList<String>();
for (int i = 30; i < 50; i++) {
@@ -159,7 +159,7 @@
for (int i = 80; i < 100; i++) {
expectedValidFiles.add(flushFiles.get(i).getFile().getName());
}
-
+
// Populate expected valid merge files.
expectedValidFiles.add(mergeFile5.getFile().getName());
expectedValidFiles.add(mergeFile6.getFile().getName());
@@ -167,19 +167,20 @@
// Sort expected files.
Collections.sort(expectedValidFiles, fileManager.getFileNameComparator());
-
+
List<Object> validFiles = fileManager.cleanupAndGetValidFiles();
-
+
// Check actual files against expected files.
assertEquals(expectedValidFiles.size(), validFiles.size());
for (int i = 0; i < expectedValidFiles.size(); i++) {
- File f = new File((String) validFiles.get(i));
+ String fileName = (String) validFiles.get(i);
+ File f = new File(fileName);
assertEquals(expectedValidFiles.get(i), f.getName());
}
-
+
// Make sure invalid files were removed from all IODevices.
ArrayList<String> remainingFiles = new ArrayList<String>();
- for (IODeviceHandle dev : ioManager.getIODevices()) {
+ for(IODeviceHandle dev : ioManager.getIODevices()) {
File dir = new File(dev.getPath(), baseDir);
FilenameFilter filter = new FilenameFilter() {
public boolean accept(File dir, String name) {
@@ -192,38 +193,38 @@
remainingFiles.add(f.getName());
}
}
-
+
Collections.sort(remainingFiles, fileManager.getFileNameComparator());
// Check actual files in directory against expected files.
assertEquals(expectedValidFiles.size(), remainingFiles.size());
for (int i = 0; i < expectedValidFiles.size(); i++) {
- assertEquals(expectedValidFiles.get(i), remainingFiles.get(i));
+ assertEquals(expectedValidFiles.get(i), remainingFiles.get(i));
}
}
-
+
@Test
public void singleIODeviceTest() throws InterruptedException, IOException {
IOManager singleDeviceIOManager = createIOManager(1);
cleanInvalidFilesTest(singleDeviceIOManager);
cleanDirs(singleDeviceIOManager);
}
-
+
@Test
public void twoIODevicesTest() throws InterruptedException, IOException {
IOManager twoDevicesIOManager = createIOManager(2);
cleanInvalidFilesTest(twoDevicesIOManager);
cleanDirs(twoDevicesIOManager);
}
-
+
@Test
public void fourIODevicesTest() throws InterruptedException, IOException {
IOManager fourDevicesIOManager = createIOManager(4);
cleanInvalidFilesTest(fourDevicesIOManager);
cleanDirs(fourDevicesIOManager);
}
-
+
private void cleanDirs(IOManager ioManager) {
- for (IODeviceHandle dev : ioManager.getIODevices()) {
+ for(IODeviceHandle dev : ioManager.getIODevices()) {
File dir = new File(dev.getPath(), baseDir);
FilenameFilter filter = new FilenameFilter() {
public boolean accept(File dir, String name) {
@@ -237,7 +238,7 @@
}
}
}
-
+
private IOManager createIOManager(int numDevices) throws HyracksException {
List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
for (int i = 0; i < numDevices; i++) {
@@ -246,19 +247,16 @@
}
return new IOManager(devices, Executors.newCachedThreadPool());
}
-
- private FileReference simulateMerge(ILSMFileManager fileManager, FileReference a, FileReference b)
- throws HyracksDataException {
- FileReference tempMergeFile = fileManager.createTempFile();
- String mergeFileName = (String) fileManager.getMergeFileName(a.getFile().getName(), b.getFile().getName());
- FileReference mergeFile = fileManager.rename(tempMergeFile, mergeFileName);
+
+ private FileReference simulateMerge(ILSMFileManager fileManager, FileReference a, FileReference b) throws HyracksDataException {
+ String relMergeFileName = (String) fileManager.getRelMergeFileName(a.getFile().getName(), b.getFile().getName());
+ FileReference mergeFile = fileManager.createMergeFile(relMergeFileName);
return mergeFile;
}
-
- private String getMergeFileName(ILSMFileManager fileNameManager, String firstFile, String lastFile)
- throws HyracksDataException {
+
+ private String getMergeFileName(ILSMFileManager fileNameManager, String firstFile, String lastFile) throws HyracksDataException {
File f1 = new File(firstFile);
File f2 = new File(lastFile);
- return (String) fileNameManager.getMergeFileName(f1.getName(), f2.getName());
+ return (String) fileNameManager.getRelMergeFileName(f1.getName(), f2.getName());
}
}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java
index d244683..ab05a88 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTest.java
@@ -49,7 +49,6 @@
return LSMRTreeTestContext.create(harness.getMemBufferCache(), harness.getMemFreePageManager(),
harness.getIOManager(), harness.getOnDiskDir(), harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes,
valueProviderFactories, numKeys, harness.getFileId());
-
}
@Override
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
index 11e5ba4..c939184 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/LSMRTreeMergeTestDriver.java
@@ -19,7 +19,6 @@
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMTreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.rtree.tests.AbstractRTreeTestContext;
@@ -45,18 +44,18 @@
// We assume all fieldSerdes are of the same type. Check the first one
// to determine which field types to generate.
if (fieldSerdes[0] instanceof IntegerSerializerDeserializer) {
- rTreeTestUtils.insertIntTuples(ctx, numTuplesToInsert, getRandom());
+ rTreeTestUtils.bulkLoadIntTuples(ctx, numTuplesToInsert, getRandom());
} else if (fieldSerdes[0] instanceof DoubleSerializerDeserializer) {
- rTreeTestUtils.insertDoubleTuples(ctx, numTuplesToInsert, getRandom());
+ rTreeTestUtils.bulkLoadDoubleTuples(ctx, numTuplesToInsert, getRandom());
}
int maxTreesToMerge = 3;
for (int i = 0; i < maxTreesToMerge; i++) {
- for (int j = 0; j < i; j++) {
- if (fieldSerdes[0] instanceof IntegerSerializerDeserializer) {
- rTreeTestUtils.bulkLoadIntTuples(ctx, numTuplesToInsert, getRandom());
- } else if (fieldSerdes[0] instanceof UTF8StringSerializerDeserializer) {
- rTreeTestUtils.bulkLoadDoubleTuples(ctx, numTuplesToInsert, getRandom());
+ for (int j = 0; j < i; j++) {
+ if (fieldSerdes[0] instanceof IntegerSerializerDeserializer) {
+ rTreeTestUtils.bulkLoadIntTuples(ctx, numTuplesToInsert, getRandom());
+ } else if (fieldSerdes[0] instanceof DoubleSerializerDeserializer) {
+ rTreeTestUtils.bulkLoadDoubleTuples(ctx, numTuplesToInsert, getRandom());
}
}